| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation.router; |
| |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_DEFAULT; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; |
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; |
| import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; |
| import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.lang.reflect.Array; |
| import java.net.ConnectException; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.HAUtil; |
| import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; |
| import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; |
| import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.crypto.CryptoProtocolVersion; |
| import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; |
| import org.apache.hadoop.fs.CacheFlag; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.ContentSummary; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FsServerDefaults; |
| import org.apache.hadoop.fs.Options; |
| import org.apache.hadoop.fs.QuotaUsage; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.fs.XAttr; |
| import org.apache.hadoop.fs.XAttrSetFlag; |
| import org.apache.hadoop.fs.permission.AclEntry; |
| import org.apache.hadoop.fs.permission.AclStatus; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.ha.HAServiceProtocol; |
| import org.apache.hadoop.hdfs.AddBlockFlag; |
| import org.apache.hadoop.hdfs.DFSUtil; |
| import org.apache.hadoop.hdfs.inotify.EventBatchList; |
| import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; |
| import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing; |
| import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; |
| import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; |
| import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; |
| import org.apache.hadoop.hdfs.protocol.CachePoolEntry; |
| import org.apache.hadoop.hdfs.protocol.CachePoolInfo; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; |
| import org.apache.hadoop.hdfs.protocol.DatanodeID; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.DirectoryListing; |
| import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; |
| import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; |
| import org.apache.hadoop.hdfs.protocol.EncryptionZone; |
| import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; |
| import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; |
| import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; |
| import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.protocol.OpenFileEntry; |
| import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; |
| import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; |
| import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; |
| import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; |
| import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; |
| import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; |
| import org.apache.hadoop.hdfs.protocol.SnapshotStatus; |
| import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; |
| import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; |
| import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; |
| import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.hdfs.protocolPB.RouterPolicyProvider; |
| import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; |
| import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; |
| import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; |
| import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; |
| import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; |
| import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; |
| import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager; |
| import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; |
| import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; |
| import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; |
| import org.apache.hadoop.hdfs.server.namenode.SafeModeException; |
| import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; |
| import org.apache.hadoop.io.EnumSetWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine2; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.RPC.Server; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.ipc.RetriableException; |
| import org.apache.hadoop.ipc.StandbyException; |
| import org.apache.hadoop.net.NodeBase; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.RefreshUserMappingsProtocol; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos; |
| import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; |
| import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.tools.GetUserMappingsProtocol; |
| import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler; |
| import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos; |
| import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; |
| import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.apache.hadoop.thirdparty.protobuf.BlockingService; |
| |
| /** |
| * This class is responsible for handling all of the RPC calls to the It is |
| * created, started, and stopped by {@link Router}. It implements the |
| * {@link ClientProtocol} to mimic a |
| * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode} and proxies |
| * the requests to the active |
| * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}. |
| */ |
| public class RouterRpcServer extends AbstractService implements ClientProtocol, |
| NamenodeProtocol, RefreshUserMappingsProtocol, GetUserMappingsProtocol { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(RouterRpcServer.class); |
| |
| |
| /** Configuration for the RPC server. */ |
| private Configuration conf; |
| |
| /** Router using this RPC server. */ |
| private final Router router; |
| |
| /** The RPC server that listens to requests from clients. */ |
| private final Server rpcServer; |
| /** The address for this RPC server. */ |
| private final InetSocketAddress rpcAddress; |
| |
| /** RPC clients to connect to the Namenodes. */ |
| private final RouterRpcClient rpcClient; |
| |
| /** Monitor metrics for the RPC calls. */ |
| private final RouterRpcMonitor rpcMonitor; |
| |
| /** If we use authentication for the connections. */ |
| private final boolean serviceAuthEnabled; |
| |
| |
| /** Interface to identify the active NN for a nameservice or blockpool ID. */ |
| private final ActiveNamenodeResolver namenodeResolver; |
| |
| /** Interface to map global name space to HDFS subcluster name spaces. */ |
| private final FileSubclusterResolver subclusterResolver; |
| |
| /** Category of the operation that a thread is executing. */ |
| private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>(); |
| |
| // Modules implementing groups of RPC calls |
| /** Router Quota calls. */ |
| private final Quota quotaCall; |
| /** NamenodeProtocol calls. */ |
| private final RouterNamenodeProtocol nnProto; |
| /** ClientProtocol calls. */ |
| private final RouterClientProtocol clientProto; |
| /** Other protocol calls. */ |
| private final RouterUserProtocol routerProto; |
| /** Router security manager to handle token operations. */ |
| private RouterSecurityManager securityManager = null; |
| /** Super user credentials that a thread may use. */ |
| private static final ThreadLocal<UserGroupInformation> CUR_USER = |
| new ThreadLocal<>(); |
| |
| /** DN type -> full DN report. */ |
| private final LoadingCache<DatanodeReportType, DatanodeInfo[]> dnCache; |
| |
| /** Specify the option of router federation rename. */ |
| private RouterRenameOption routerRenameOption; |
| /** Schedule the router federation rename jobs. */ |
| private BalanceProcedureScheduler fedRenameScheduler; |
| /** |
| * Construct a router RPC server. |
| * |
| * @param conf HDFS Configuration. |
| * @param router A router using this RPC server. |
| * @param nnResolver The NN resolver instance to determine active NNs in HA. |
| * @param fileResolver File resolver to resolve file paths to subclusters. |
| * @throws IOException If the RPC server could not be created. |
| */ |
| public RouterRpcServer(Configuration conf, Router router, |
| ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) |
| throws IOException { |
| super(RouterRpcServer.class.getName()); |
| |
| this.conf = conf; |
| this.router = router; |
| this.namenodeResolver = nnResolver; |
| this.subclusterResolver = fileResolver; |
| |
| // RPC server settings |
| int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, |
| DFS_ROUTER_HANDLER_COUNT_DEFAULT); |
| |
| int readerCount = this.conf.getInt(DFS_ROUTER_READER_COUNT_KEY, |
| DFS_ROUTER_READER_COUNT_DEFAULT); |
| |
| int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY, |
| DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT); |
| |
| // Override Hadoop Common IPC setting |
| int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY, |
| DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT); |
| this.conf.setInt( |
| CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, |
| readerQueueSize); |
| |
| RPC.setProtocolEngine(this.conf, ClientNamenodeProtocolPB.class, |
| ProtobufRpcEngine2.class); |
| |
| ClientNamenodeProtocolServerSideTranslatorPB |
| clientProtocolServerTranslator = |
| new ClientNamenodeProtocolServerSideTranslatorPB(this); |
| BlockingService clientNNPbService = ClientNamenodeProtocol |
| .newReflectiveBlockingService(clientProtocolServerTranslator); |
| |
| NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = |
| new NamenodeProtocolServerSideTranslatorPB(this); |
| BlockingService nnPbService = NamenodeProtocolService |
| .newReflectiveBlockingService(namenodeProtocolXlator); |
| |
| RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = |
| new RefreshUserMappingsProtocolServerSideTranslatorPB(this); |
| BlockingService refreshUserMappingService = |
| RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService. |
| newReflectiveBlockingService(refreshUserMappingXlator); |
| |
| GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = |
| new GetUserMappingsProtocolServerSideTranslatorPB(this); |
| BlockingService getUserMappingService = |
| GetUserMappingsProtocolProtos.GetUserMappingsProtocolService. |
| newReflectiveBlockingService(getUserMappingXlator); |
| |
| InetSocketAddress confRpcAddress = conf.getSocketAddr( |
| RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, |
| RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, |
| RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_DEFAULT, |
| RBFConfigKeys.DFS_ROUTER_RPC_PORT_DEFAULT); |
| LOG.info("RPC server binding to {} with {} handlers for Router {}", |
| confRpcAddress, handlerCount, this.router.getRouterId()); |
| |
| // Create security manager |
| this.securityManager = new RouterSecurityManager(this.conf); |
| RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); |
| |
| this.rpcServer = new RPC.Builder(this.conf) |
| .setProtocol(ClientNamenodeProtocolPB.class) |
| .setInstance(clientNNPbService) |
| .setBindAddress(confRpcAddress.getHostName()) |
| .setPort(confRpcAddress.getPort()) |
| .setNumHandlers(handlerCount) |
| .setnumReaders(readerCount) |
| .setQueueSizePerHandler(handlerQueueSize) |
| .setVerbose(false) |
| .setAlignmentContext(routerStateIdContext) |
| .setSecretManager(this.securityManager.getSecretManager()) |
| .build(); |
| |
| // Add all the RPC protocols that the Router implements |
| DFSUtil.addPBProtocol( |
| conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); |
| DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, |
| refreshUserMappingService, this.rpcServer); |
| DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, |
| getUserMappingService, this.rpcServer); |
| |
| // Set service-level authorization security policy |
| this.serviceAuthEnabled = conf.getBoolean( |
| HADOOP_SECURITY_AUTHORIZATION, false); |
| if (this.serviceAuthEnabled) { |
| rpcServer.refreshServiceAcl(conf, new RouterPolicyProvider()); |
| } |
| |
| // We don't want the server to log the full stack trace for some exceptions |
| this.rpcServer.addTerseExceptions( |
| RemoteException.class, |
| SafeModeException.class, |
| FileNotFoundException.class, |
| FileAlreadyExistsException.class, |
| AccessControlException.class, |
| LeaseExpiredException.class, |
| NotReplicatedYetException.class, |
| IOException.class, |
| ConnectException.class, |
| RetriableException.class); |
| |
| this.rpcServer.addSuppressedLoggingExceptions( |
| StandbyException.class); |
| |
| // The RPC-server port can be ephemeral... ensure we have the correct info |
| InetSocketAddress listenAddress = this.rpcServer.getListenerAddress(); |
| this.rpcAddress = new InetSocketAddress( |
| confRpcAddress.getHostName(), listenAddress.getPort()); |
| |
| if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, |
| RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) { |
| // Create metrics monitor |
| Class<? extends RouterRpcMonitor> rpcMonitorClass = this.conf.getClass( |
| RBFConfigKeys.DFS_ROUTER_METRICS_CLASS, |
| RBFConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT, |
| RouterRpcMonitor.class); |
| this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); |
| } else { |
| this.rpcMonitor = null; |
| } |
| |
| // Create the client |
| this.rpcClient = new RouterRpcClient(this.conf, this.router, |
| this.namenodeResolver, this.rpcMonitor, routerStateIdContext); |
| |
| // Initialize modules |
| this.quotaCall = new Quota(this.router, this); |
| this.nnProto = new RouterNamenodeProtocol(this); |
| this.clientProto = new RouterClientProtocol(conf, this); |
| this.routerProto = new RouterUserProtocol(this); |
| |
| long dnCacheExpire = conf.getTimeDuration( |
| DN_REPORT_CACHE_EXPIRE, |
| DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); |
| this.dnCache = CacheBuilder.newBuilder() |
| .build(new DatanodeReportCacheLoader()); |
| |
| // Actively refresh the dn cache in a configured interval |
| Executors |
| .newSingleThreadScheduledExecutor() |
| .scheduleWithFixedDelay(() -> this.dnCache |
| .asMap() |
| .keySet() |
| .parallelStream() |
| .forEach(this.dnCache::refresh), |
| 0, |
| dnCacheExpire, TimeUnit.MILLISECONDS); |
| initRouterFedRename(); |
| } |
| |
| /** |
| * Init the router federation rename environment. Each router has its own |
| * journal path. |
| * In HA mode the journal path is: |
| * JOURNAL_BASE/nsId/namenodeId |
| * e.g. |
| * /journal/router-namespace/host0 |
| * In non-ha mode the journal path is based on ip and port: |
| * JOURNAL_BASE/host_port |
| * e.g. |
| * /journal/0.0.0.0_8888 |
| */ |
| private void initRouterFedRename() throws IOException { |
| routerRenameOption = RouterRenameOption.valueOf( |
| conf.get(DFS_ROUTER_FEDERATION_RENAME_OPTION, |
| DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT).toUpperCase()); |
| switch (routerRenameOption) { |
| case DISTCP: |
| RouterFederationRename.checkConfiguration(conf); |
| Configuration sConf = new Configuration(conf); |
| URI journalUri; |
| try { |
| journalUri = new URI(sConf.get(SCHEDULER_JOURNAL_URI)); |
| } catch (URISyntaxException | NullPointerException e) { |
| throw new IOException("Bad journal uri. Please check configuration for " |
| + SCHEDULER_JOURNAL_URI); |
| } |
| Path child; |
| String nsId = DFSUtil.getNamenodeNameServiceId(conf); |
| String namenodeId = HAUtil.getNameNodeId(conf, nsId); |
| InetSocketAddress listenAddress = this.rpcServer.getListenerAddress(); |
| if (nsId == null || namenodeId == null) { |
| child = new Path( |
| listenAddress.getHostName() + "_" + listenAddress.getPort()); |
| } else { |
| child = new Path(nsId, namenodeId); |
| } |
| String routerJournal = new Path(journalUri.toString(), child).toString(); |
| sConf.set(SCHEDULER_JOURNAL_URI, routerJournal); |
| fedRenameScheduler = new BalanceProcedureScheduler(sConf); |
| fedRenameScheduler.init(true); |
| break; |
| case NONE: |
| fedRenameScheduler = null; |
| break; |
| default: |
| break; |
| } |
| } |
| |
| @Override |
| protected void serviceInit(Configuration configuration) throws Exception { |
| this.conf = configuration; |
| |
| if (this.rpcMonitor == null) { |
| LOG.info("Do not start Router RPC metrics"); |
| } else { |
| this.rpcMonitor.init(this.conf, this, this.router.getStateStore()); |
| } |
| |
| super.serviceInit(configuration); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| if (this.rpcServer != null) { |
| this.rpcServer.start(); |
| LOG.info("Router RPC up at: {}", this.getRpcAddress()); |
| } |
| super.serviceStart(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| if (this.rpcServer != null) { |
| this.rpcServer.stop(); |
| } |
| if (rpcMonitor != null) { |
| this.rpcMonitor.close(); |
| } |
| if (securityManager != null) { |
| this.securityManager.stop(); |
| } |
| if (this.fedRenameScheduler != null) { |
| fedRenameScheduler.shutDown(); |
| } |
| super.serviceStop(); |
| } |
| |
| boolean isEnableRenameAcrossNamespace() { |
| return routerRenameOption != RouterRenameOption.NONE; |
| } |
| |
| BalanceProcedureScheduler getFedRenameScheduler() { |
| return this.fedRenameScheduler; |
| } |
| |
| /** |
| * Get the RPC security manager. |
| * |
| * @return RPC security manager. |
| */ |
| public RouterSecurityManager getRouterSecurityManager() { |
| return this.securityManager; |
| } |
| |
| /** |
| * Get the RPC client to the Namenode. |
| * |
| * @return RPC clients to the Namenodes. |
| */ |
| public RouterRpcClient getRPCClient() { |
| return rpcClient; |
| } |
| |
| /** |
| * Get the subcluster resolver. |
| * |
| * @return Subcluster resolver. |
| */ |
| public FileSubclusterResolver getSubclusterResolver() { |
| return subclusterResolver; |
| } |
| |
| /** |
| * Get the active namenode resolver. |
| * |
| * @return Active namenode resolver. |
| */ |
| public ActiveNamenodeResolver getNamenodeResolver() { |
| return namenodeResolver; |
| } |
| |
| /** |
| * Get the RPC monitor and metrics. |
| * |
| * @return RPC monitor and metrics. |
| */ |
| public RouterRpcMonitor getRPCMonitor() { |
| return rpcMonitor; |
| } |
| |
| /** |
| * Allow access to the client RPC server for testing. |
| * |
| * @return The RPC server. |
| */ |
| @VisibleForTesting |
| public Server getServer() { |
| return rpcServer; |
| } |
| |
| /** |
| * Get the RPC address of the service. |
| * |
| * @return RPC service address. |
| */ |
| public InetSocketAddress getRpcAddress() { |
| return rpcAddress; |
| } |
| |
| /** |
| * Check if the Router is in safe mode. We should only see READ, WRITE, and |
| * UNCHECKED. It includes a default handler when we haven't implemented an |
| * operation. If not supported, it always throws an exception reporting the |
| * operation. |
| * |
| * @param op Category of the operation to check. |
| * @param supported If the operation is supported or not. If not, it will |
| * throw an UnsupportedOperationException. |
| * @throws StandbyException If the Router is in safe mode and cannot serve |
| * client requests. |
| * @throws UnsupportedOperationException If the operation is not supported. |
| */ |
| void checkOperation(OperationCategory op, boolean supported) |
| throws StandbyException, UnsupportedOperationException { |
| checkOperation(op); |
| |
| if (!supported) { |
| if (rpcMonitor != null) { |
| rpcMonitor.proxyOpNotImplemented(); |
| } |
| String methodName = getMethodName(); |
| throw new UnsupportedOperationException( |
| "Operation \"" + methodName + "\" is not supported"); |
| } |
| } |
| |
| /** |
| * Check if the Router is in safe mode. We should only see READ, WRITE, and |
| * UNCHECKED. This function should be called by all ClientProtocol functions. |
| * |
| * @param op Category of the operation to check. |
| * @throws StandbyException If the Router is in safe mode and cannot serve |
| * client requests. |
| */ |
| void checkOperation(OperationCategory op) |
| throws StandbyException { |
| // Log the function we are currently calling. |
| if (rpcMonitor != null) { |
| rpcMonitor.startOp(); |
| } |
| // Log the function we are currently calling. |
| if (LOG.isDebugEnabled()) { |
| String methodName = getMethodName(); |
| LOG.debug("Proxying operation: {}", methodName); |
| } |
| |
| // Store the category of the operation category for this thread |
| opCategory.set(op); |
| |
| // We allow unchecked and read operations to try, fail later |
| if (op == OperationCategory.UNCHECKED || op == OperationCategory.READ) { |
| return; |
| } |
| checkSafeMode(); |
| } |
| |
| /** |
| * Check if the Router is in safe mode. |
| * @throws StandbyException If the Router is in safe mode and cannot serve |
| * client requests. |
| */ |
| private void checkSafeMode() throws StandbyException { |
| if (isSafeMode()) { |
| // Throw standby exception, router is not available |
| if (rpcMonitor != null) { |
| rpcMonitor.routerFailureSafemode(); |
| } |
| OperationCategory op = opCategory.get(); |
| throw new StandbyException("Router " + router.getRouterId() + |
| " is in safe mode and cannot handle " + op + " requests"); |
| } |
| } |
| |
| /** |
| * Return true if the Router is in safe mode. |
| * |
| * @return true if the Router is in safe mode. |
| */ |
| boolean isSafeMode() { |
| RouterSafemodeService safemodeService = router.getSafemodeService(); |
| return (safemodeService != null && safemodeService.isInSafeMode()); |
| } |
| |
| /** |
| * Get the name of the method that is calling this function. |
| * |
| * @return Name of the method calling this function. |
| */ |
| static String getMethodName() { |
| final StackTraceElement[] stack = Thread.currentThread().getStackTrace(); |
| String methodName = stack[3].getMethodName(); |
| return methodName; |
| } |
| |
| /** |
| * Invokes the method at default namespace, if default namespace is not |
| * available then at the other available namespaces. |
| * If the namespace is unavailable, retry with other namespaces. |
| * @param <T> expected return type. |
| * @param method the remote method. |
| * @return the response received after invoking method. |
| * @throws IOException |
| */ |
| <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz) |
| throws IOException { |
| String nsId = subclusterResolver.getDefaultNamespace(); |
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); |
| // If no namespace is available, then throw this IOException. |
| IOException io = new IOException("No namespace available."); |
| // If default Ns is present return result from that namespace. |
| if (!nsId.isEmpty()) { |
| try { |
| return rpcClient.invokeSingle(nsId, method, clazz); |
| } catch (IOException ioe) { |
| if (!clientProto.isUnavailableSubclusterException(ioe)) { |
| LOG.debug("{} exception cannot be retried", |
| ioe.getClass().getSimpleName()); |
| throw ioe; |
| } |
| // Remove the already tried namespace. |
| nss.removeIf(n -> n.getNameserviceId().equals(nsId)); |
| return invokeOnNs(method, clazz, io, nss); |
| } |
| } |
| return invokeOnNs(method, clazz, io, nss); |
| } |
| |
| /** |
| * Invoke the method sequentially on available namespaces, |
| * throw no namespace available exception, if no namespaces are available. |
| * @param method the remote method. |
| * @param clazz Class for the return type. |
| * @param ioe IOException . |
| * @param nss List of name spaces in the federation |
| * @return the response received after invoking method. |
| * @throws IOException |
| */ |
| <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe, |
| Set<FederationNamespaceInfo> nss) throws IOException { |
| if (nss.isEmpty()) { |
| throw ioe; |
| } |
| for (FederationNamespaceInfo fnInfo : nss) { |
| String nsId = fnInfo.getNameserviceId(); |
| LOG.debug("Invoking {} on namespace {}", method, nsId); |
| try { |
| return rpcClient.invokeSingle(nsId, method, clazz); |
| } catch (IOException e) { |
| LOG.debug("Failed to invoke {} on namespace {}", method, nsId, e); |
| // Ignore the exception and try on other namespace, if the tried |
| // namespace is unavailable, else throw the received exception. |
| if (!clientProto.isUnavailableSubclusterException(e)) { |
| throw e; |
| } |
| } |
| } |
| // Couldn't get a response from any of the namespace, throw ioe. |
| throw ioe; |
| } |
| |
| @Override // ClientProtocol |
| public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) |
| throws IOException { |
| return clientProto.getDelegationToken(renewer); |
| } |
| |
| @Override // ClientProtocol |
| public long renewDelegationToken(Token<DelegationTokenIdentifier> token) |
| throws IOException { |
| return clientProto.renewDelegationToken(token); |
| } |
| |
| @Override // ClientProtocol |
| public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) |
| throws IOException { |
| clientProto.cancelDelegationToken(token); |
| } |
| |
| @Override // ClientProtocol |
| public LocatedBlocks getBlockLocations(String src, final long offset, |
| final long length) throws IOException { |
| return clientProto.getBlockLocations(src, offset, length); |
| } |
| |
| @Override // ClientProtocol |
| public FsServerDefaults getServerDefaults() throws IOException { |
| return clientProto.getServerDefaults(); |
| } |
| |
| @Override // ClientProtocol |
| public HdfsFileStatus create(String src, FsPermission masked, |
| String clientName, EnumSetWritable<CreateFlag> flag, |
| boolean createParent, short replication, long blockSize, |
| CryptoProtocolVersion[] supportedVersions, String ecPolicyName, |
| String storagePolicy) |
| throws IOException { |
| return clientProto.create(src, masked, clientName, flag, createParent, |
| replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); |
| } |
| |
| |
| /** |
| * Get the location to create a file. It checks if the file already existed |
| * in one of the locations. |
| * |
| * @param src Path of the file to check. |
| * @return The remote location for this file. |
| * @throws IOException If the file has no creation location. |
| */ |
| RemoteLocation getCreateLocation(final String src) throws IOException { |
| final List<RemoteLocation> locations = getLocationsForPath(src, true); |
| return getCreateLocation(src, locations); |
| } |
| |
| /** |
| * Get the location to create a file. It checks if the file already existed |
| * in one of the locations. |
| * |
| * @param src Path of the file to check. |
| * @param locations Prefetched locations for the file. |
| * @return The remote location for this file. |
| * @throws IOException If the file has no creation location. |
| */ |
| RemoteLocation getCreateLocation( |
| final String src, final List<RemoteLocation> locations) |
| throws IOException { |
| |
| if (locations == null || locations.isEmpty()) { |
| throw new IOException("Cannot get locations to create " + src); |
| } |
| |
| RemoteLocation createLocation = locations.get(0); |
| if (locations.size() > 1) { |
| try { |
| RemoteLocation existingLocation = getExistingLocation(src, locations); |
| // Forward to the existing location and let the NN handle the error |
| if (existingLocation != null) { |
| LOG.debug("{} already exists in {}.", src, existingLocation); |
| createLocation = existingLocation; |
| } |
| } catch (FileNotFoundException fne) { |
| // Ignore if the file is not found |
| } |
| } |
| return createLocation; |
| } |
| |
| /** |
| * Gets the remote location where the file exists. |
| * @param src the name of file. |
| * @param locations all the remote locations. |
| * @return the remote location of the file if it exists, else null. |
| * @throws IOException in case of any exception. |
| */ |
| private RemoteLocation getExistingLocation(String src, |
| List<RemoteLocation> locations) throws IOException { |
| RemoteMethod method = new RemoteMethod("getFileInfo", |
| new Class<?>[] {String.class}, new RemoteParam()); |
| Map<RemoteLocation, HdfsFileStatus> results = rpcClient.invokeConcurrent( |
| locations, method, true, false, HdfsFileStatus.class); |
| for (RemoteLocation loc : locations) { |
| if (results.get(loc) != null) { |
| return loc; |
| } |
| } |
| return null; |
| } |
| |
| @Override // ClientProtocol |
| public LastBlockWithStatus append(String src, final String clientName, |
| final EnumSetWritable<CreateFlag> flag) throws IOException { |
| return clientProto.append(src, clientName, flag); |
| } |
| |
| @Override // ClientProtocol |
| public boolean recoverLease(String src, String clientName) |
| throws IOException { |
| return clientProto.recoverLease(src, clientName); |
| } |
| |
| @Override // ClientProtocol |
| public boolean setReplication(String src, short replication) |
| throws IOException { |
| return clientProto.setReplication(src, replication); |
| } |
| |
| @Override // ClientProtocol |
| public void setStoragePolicy(String src, String policyName) |
| throws IOException { |
| clientProto.setStoragePolicy(src, policyName); |
| } |
| |
| @Override // ClientProtocol |
| public BlockStoragePolicy[] getStoragePolicies() throws IOException { |
| return clientProto.getStoragePolicies(); |
| } |
| |
| @Override // ClientProtocol |
| public void setPermission(String src, FsPermission permissions) |
| throws IOException { |
| clientProto.setPermission(src, permissions); |
| } |
| |
| @Override // ClientProtocol |
| public void setOwner(String src, String username, String groupname) |
| throws IOException { |
| clientProto.setOwner(src, username, groupname); |
| } |
| |
| /** |
| * Excluded and favored nodes are not verified and will be ignored by |
| * placement policy if they are not in the same nameservice as the file. |
| */ |
| @Override // ClientProtocol |
| public LocatedBlock addBlock(String src, String clientName, |
| ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, |
| String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags) |
| throws IOException { |
| return clientProto.addBlock(src, clientName, previous, excludedNodes, |
| fileId, favoredNodes, addBlockFlags); |
| } |
| |
| /** |
| * Excluded nodes are not verified and will be ignored by placement if they |
| * are not in the same nameservice as the file. |
| */ |
| @Override // ClientProtocol |
| public LocatedBlock getAdditionalDatanode(final String src, final long fileId, |
| final ExtendedBlock blk, final DatanodeInfo[] existings, |
| final String[] existingStorageIDs, final DatanodeInfo[] excludes, |
| final int numAdditionalNodes, final String clientName) |
| throws IOException { |
| return clientProto.getAdditionalDatanode(src, fileId, blk, existings, |
| existingStorageIDs, excludes, numAdditionalNodes, clientName); |
| } |
| |
| @Override // ClientProtocol |
| public void abandonBlock(ExtendedBlock b, long fileId, String src, |
| String holder) throws IOException { |
| clientProto.abandonBlock(b, fileId, src, holder); |
| } |
| |
| @Override // ClientProtocol |
| public boolean complete(String src, String clientName, ExtendedBlock last, |
| long fileId) throws IOException { |
| return clientProto.complete(src, clientName, last, fileId); |
| } |
| |
| @Override // ClientProtocol |
| public LocatedBlock updateBlockForPipeline( |
| ExtendedBlock block, String clientName) throws IOException { |
| return clientProto.updateBlockForPipeline(block, clientName); |
| } |
| |
| /** |
| * Datanode are not verified to be in the same nameservice as the old block. |
| * TODO This may require validation. |
| */ |
| @Override // ClientProtocol |
| public void updatePipeline(String clientName, ExtendedBlock oldBlock, |
| ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) |
| throws IOException { |
| clientProto.updatePipeline(clientName, oldBlock, newBlock, newNodes, |
| newStorageIDs); |
| } |
| |
| @Override // ClientProtocol |
| public long getPreferredBlockSize(String src) throws IOException { |
| return clientProto.getPreferredBlockSize(src); |
| } |
| |
| @Deprecated |
| @Override // ClientProtocol |
| public boolean rename(final String src, final String dst) |
| throws IOException { |
| return clientProto.rename(src, dst); |
| } |
| |
| @Override // ClientProtocol |
| public void rename2(final String src, final String dst, |
| final Options.Rename... options) throws IOException { |
| clientProto.rename2(src, dst, options); |
| } |
| |
| @Override // ClientProtocol |
| public void concat(String trg, String[] src) throws IOException { |
| clientProto.concat(trg, src); |
| } |
| |
| @Override // ClientProtocol |
| public boolean truncate(String src, long newLength, String clientName) |
| throws IOException { |
| return clientProto.truncate(src, newLength, clientName); |
| } |
| |
| @Override // ClientProtocol |
| public boolean delete(String src, boolean recursive) throws IOException { |
| return clientProto.delete(src, recursive); |
| } |
| |
| @Override // ClientProtocol |
| public boolean mkdirs(String src, FsPermission masked, boolean createParent) |
| throws IOException { |
| return clientProto.mkdirs(src, masked, createParent); |
| } |
| |
| @Override // ClientProtocol |
| public void renewLease(String clientName, List<String> namespaces) |
| throws IOException { |
| clientProto.renewLease(clientName, namespaces); |
| } |
| |
| @Override // ClientProtocol |
| public DirectoryListing getListing(String src, byte[] startAfter, |
| boolean needLocation) throws IOException { |
| return clientProto.getListing(src, startAfter, needLocation); |
| } |
| |
| @Override |
| public BatchedDirectoryListing getBatchedListing( |
| String[] srcs, byte[] startAfter, boolean needLocation) |
| throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override // ClientProtocol |
| public HdfsFileStatus getFileInfo(String src) throws IOException { |
| return clientProto.getFileInfo(src); |
| } |
| |
| @Override // ClientProtocol |
| public boolean isFileClosed(String src) throws IOException { |
| return clientProto.isFileClosed(src); |
| } |
| |
| @Override // ClientProtocol |
| public HdfsFileStatus getFileLinkInfo(String src) throws IOException { |
| return clientProto.getFileLinkInfo(src); |
| } |
| |
| @Override // ClientProtocol |
| public HdfsLocatedFileStatus getLocatedFileInfo(String src, |
| boolean needBlockToken) throws IOException { |
| return clientProto.getLocatedFileInfo(src, needBlockToken); |
| } |
| |
| @Override // ClientProtocol |
| public long[] getStats() throws IOException { |
| return clientProto.getStats(); |
| } |
| |
| @Override // ClientProtocol |
| public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) |
| throws IOException { |
| return clientProto.getDatanodeReport(type); |
| } |
| |
| /** |
| * Get the datanode report from cache. |
| * |
| * @param type Type of the datanode. |
| * @return List of datanodes. |
| * @throws IOException If it cannot get the report. |
| */ |
| DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type) |
| throws IOException { |
| try { |
| DatanodeInfo[] dns = this.dnCache.get(type); |
| if (dns == null) { |
| LOG.debug("Get null DN report from cache"); |
| dns = getCachedDatanodeReportImpl(type); |
| this.dnCache.put(type, dns); |
| } |
| return dns; |
| } catch (ExecutionException e) { |
| LOG.error("Cannot get the DN report for {}", type, e); |
| Throwable cause = e.getCause(); |
| if (cause instanceof IOException) { |
| throw (IOException) cause; |
| } else { |
| throw new IOException(cause); |
| } |
| } |
| } |
| |
| private DatanodeInfo[] getCachedDatanodeReportImpl( |
| final DatanodeReportType type) throws IOException { |
| // We need to get the DNs as a privileged user |
| UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); |
| RouterRpcServer.setCurrentUser(loginUser); |
| |
| try { |
| DatanodeInfo[] dns = clientProto.getDatanodeReport(type); |
| LOG.debug("Refresh cached DN report with {} datanodes", dns.length); |
| return dns; |
| } finally { |
| // Reset ugi to remote user for remaining operations. |
| RouterRpcServer.resetCurrentUser(); |
| } |
| } |
| |
| /** |
| * Get the datanode report with a timeout. |
| * @param type Type of the datanode. |
| * @param requireResponse If we require all the namespaces to report. |
| * @param timeOutMs Time out for the reply in milliseconds. |
| * @return List of datanodes. |
| * @throws IOException If it cannot get the report. |
| */ |
| public DatanodeInfo[] getDatanodeReport( |
| DatanodeReportType type, boolean requireResponse, long timeOutMs) |
| throws IOException { |
| checkOperation(OperationCategory.UNCHECKED); |
| |
| Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>(); |
| RemoteMethod method = new RemoteMethod("getDatanodeReport", |
| new Class<?>[] {DatanodeReportType.class}, type); |
| |
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); |
| Map<FederationNamespaceInfo, DatanodeInfo[]> results = |
| rpcClient.invokeConcurrent(nss, method, requireResponse, false, |
| timeOutMs, DatanodeInfo[].class); |
| updateDnMap(results, datanodesMap); |
| // Map -> Array |
| Collection<DatanodeInfo> datanodes = datanodesMap.values(); |
| return toArray(datanodes, DatanodeInfo.class); |
| } |
| |
| @Override // ClientProtocol |
| public DatanodeStorageReport[] getDatanodeStorageReport( |
| DatanodeReportType type) throws IOException { |
| return clientProto.getDatanodeStorageReport(type); |
| } |
| |
| /** |
| * Get the list of datanodes per subcluster. |
| * |
| * @param type Type of the datanodes to get. |
| * @return nsId to datanode list. |
| * @throws IOException If the method cannot be invoked remotely. |
| */ |
| public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap( |
| DatanodeReportType type) throws IOException { |
| return getDatanodeStorageReportMap(type, true, -1); |
| } |
| |
| /** |
| * Get the list of datanodes per subcluster. |
| * |
| * @param type Type of the datanodes to get. |
| * @param requireResponse If true an exception will be thrown if all calls do |
| * not complete. If false exceptions are ignored and all data results |
| * successfully received are returned. |
| * @param timeOutMs Time out for the reply in milliseconds. |
| * @return nsId to datanode list. |
| * @throws IOException If the method cannot be invoked remotely. |
| */ |
| public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap( |
| DatanodeReportType type, boolean requireResponse, long timeOutMs) |
| throws IOException { |
| |
| Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>(); |
| RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", |
| new Class<?>[] {DatanodeReportType.class}, type); |
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); |
| Map<FederationNamespaceInfo, DatanodeStorageReport[]> results = |
| rpcClient.invokeConcurrent( |
| nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class); |
| for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry : |
| results.entrySet()) { |
| FederationNamespaceInfo ns = entry.getKey(); |
| String nsId = ns.getNameserviceId(); |
| DatanodeStorageReport[] result = entry.getValue(); |
| ret.put(nsId, result); |
| } |
| return ret; |
| } |
| |
| @Override // ClientProtocol |
| public boolean setSafeMode(SafeModeAction action, boolean isChecked) |
| throws IOException { |
| return clientProto.setSafeMode(action, isChecked); |
| } |
| |
| @Override // ClientProtocol |
| public boolean restoreFailedStorage(String arg) throws IOException { |
| return clientProto.restoreFailedStorage(arg); |
| } |
| |
| @Override // ClientProtocol |
| public boolean saveNamespace(long timeWindow, long txGap) throws IOException { |
| return clientProto.saveNamespace(timeWindow, txGap); |
| } |
| |
| @Override // ClientProtocol |
| public long rollEdits() throws IOException { |
| return clientProto.rollEdits(); |
| } |
| |
| @Override // ClientProtocol |
| public void refreshNodes() throws IOException { |
| clientProto.refreshNodes(); |
| } |
| |
| @Override // ClientProtocol |
| public void finalizeUpgrade() throws IOException { |
| clientProto.finalizeUpgrade(); |
| } |
| |
| @Override // ClientProtocol |
| public boolean upgradeStatus() throws IOException { |
| return clientProto.upgradeStatus(); |
| } |
| |
| @Override // ClientProtocol |
| public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) |
| throws IOException { |
| return clientProto.rollingUpgrade(action); |
| } |
| |
| @Override // ClientProtocol |
| public void metaSave(String filename) throws IOException { |
| clientProto.metaSave(filename); |
| } |
| |
| @Override // ClientProtocol |
| public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) |
| throws IOException { |
| return clientProto.listCorruptFileBlocks(path, cookie); |
| } |
| |
| @Override // ClientProtocol |
| public void setBalancerBandwidth(long bandwidth) throws IOException { |
| clientProto.setBalancerBandwidth(bandwidth); |
| } |
| |
| @Override // ClientProtocol |
| public ContentSummary getContentSummary(String path) throws IOException { |
| return clientProto.getContentSummary(path); |
| } |
| |
| @Override // ClientProtocol |
| public void fsync(String src, long fileId, String clientName, |
| long lastBlockLength) throws IOException { |
| clientProto.fsync(src, fileId, clientName, lastBlockLength); |
| } |
| |
| @Override // ClientProtocol |
| public void setTimes(String src, long mtime, long atime) throws IOException { |
| clientProto.setTimes(src, mtime, atime); |
| } |
| |
| @Override // ClientProtocol |
| public void createSymlink(String target, String link, FsPermission dirPerms, |
| boolean createParent) throws IOException { |
| clientProto.createSymlink(target, link, dirPerms, createParent); |
| } |
| |
| @Override // ClientProtocol |
| public String getLinkTarget(String path) throws IOException { |
| return clientProto.getLinkTarget(path); |
| } |
| |
| @Override // ClientProtocol |
| public void allowSnapshot(String snapshotRoot) throws IOException { |
| clientProto.allowSnapshot(snapshotRoot); |
| } |
| |
| @Override // ClientProtocol |
| public void disallowSnapshot(String snapshot) throws IOException { |
| clientProto.disallowSnapshot(snapshot); |
| } |
| |
| @Override // ClientProtocol |
| public void renameSnapshot(String snapshotRoot, String snapshotOldName, |
| String snapshotNewName) throws IOException { |
| clientProto.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName); |
| } |
| |
| @Override // ClientProtocol |
| public SnapshottableDirectoryStatus[] getSnapshottableDirListing() |
| throws IOException { |
| return clientProto.getSnapshottableDirListing(); |
| } |
| |
| @Override // ClientProtocol |
| public SnapshotStatus[] getSnapshotListing(String snapshotRoot) |
| throws IOException { |
| return clientProto.getSnapshotListing(snapshotRoot); |
| } |
| |
| @Override // ClientProtocol |
| public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, |
| String earlierSnapshotName, String laterSnapshotName) throws IOException { |
| return clientProto.getSnapshotDiffReport( |
| snapshotRoot, earlierSnapshotName, laterSnapshotName); |
| } |
| |
| @Override // ClientProtocol |
| public SnapshotDiffReportListing getSnapshotDiffReportListing( |
| String snapshotRoot, String earlierSnapshotName, String laterSnapshotName, |
| byte[] startPath, int index) throws IOException { |
| return clientProto.getSnapshotDiffReportListing(snapshotRoot, |
| earlierSnapshotName, laterSnapshotName, startPath, index); |
| } |
| |
| @Override // ClientProtocol |
| public long addCacheDirective(CacheDirectiveInfo path, |
| EnumSet<CacheFlag> flags) throws IOException { |
| return clientProto.addCacheDirective(path, flags); |
| } |
| |
| @Override // ClientProtocol |
| public void modifyCacheDirective(CacheDirectiveInfo directive, |
| EnumSet<CacheFlag> flags) throws IOException { |
| clientProto.modifyCacheDirective(directive, flags); |
| } |
| |
| @Override // ClientProtocol |
| public void removeCacheDirective(long id) throws IOException { |
| clientProto.removeCacheDirective(id); |
| } |
| |
| @Override // ClientProtocol |
| public BatchedEntries<CacheDirectiveEntry> listCacheDirectives( |
| long prevId, CacheDirectiveInfo filter) throws IOException { |
| return clientProto.listCacheDirectives(prevId, filter); |
| } |
| |
| @Override // ClientProtocol |
| public void addCachePool(CachePoolInfo info) throws IOException { |
| clientProto.addCachePool(info); |
| } |
| |
| @Override // ClientProtocol |
| public void modifyCachePool(CachePoolInfo info) throws IOException { |
| clientProto.modifyCachePool(info); |
| } |
| |
| @Override // ClientProtocol |
| public void removeCachePool(String cachePoolName) throws IOException { |
| clientProto.removeCachePool(cachePoolName); |
| } |
| |
| @Override // ClientProtocol |
| public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) |
| throws IOException { |
| return clientProto.listCachePools(prevKey); |
| } |
| |
| @Override // ClientProtocol |
| public void modifyAclEntries(String src, List<AclEntry> aclSpec) |
| throws IOException { |
| clientProto.modifyAclEntries(src, aclSpec); |
| } |
| |
| @Override // ClientProtocol |
| public void removeAclEntries(String src, List<AclEntry> aclSpec) |
| throws IOException { |
| clientProto.removeAclEntries(src, aclSpec); |
| } |
| |
| @Override // ClientProtocol |
| public void removeDefaultAcl(String src) throws IOException { |
| clientProto.removeDefaultAcl(src); |
| } |
| |
| @Override // ClientProtocol |
| public void removeAcl(String src) throws IOException { |
| clientProto.removeAcl(src); |
| } |
| |
| @Override // ClientProtocol |
| public void setAcl(String src, List<AclEntry> aclSpec) throws IOException { |
| clientProto.setAcl(src, aclSpec); |
| } |
| |
| @Override // ClientProtocol |
| public AclStatus getAclStatus(String src) throws IOException { |
| return clientProto.getAclStatus(src); |
| } |
| |
| @Override // ClientProtocol |
| public void createEncryptionZone(String src, String keyName) |
| throws IOException { |
| clientProto.createEncryptionZone(src, keyName); |
| } |
| |
| @Override // ClientProtocol |
| public EncryptionZone getEZForPath(String src) throws IOException { |
| return clientProto.getEZForPath(src); |
| } |
| |
| @Override // ClientProtocol |
| public BatchedEntries<EncryptionZone> listEncryptionZones(long prevId) |
| throws IOException { |
| return clientProto.listEncryptionZones(prevId); |
| } |
| |
| @Override // ClientProtocol |
| public void reencryptEncryptionZone(String zone, ReencryptAction action) |
| throws IOException { |
| clientProto.reencryptEncryptionZone(zone, action); |
| } |
| |
| @Override // ClientProtocol |
| public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus( |
| long prevId) throws IOException { |
| return clientProto.listReencryptionStatus(prevId); |
| } |
| |
| @Override // ClientProtocol |
| public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) |
| throws IOException { |
| clientProto.setXAttr(src, xAttr, flag); |
| } |
| |
| @Override // ClientProtocol |
| public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs) |
| throws IOException { |
| return clientProto.getXAttrs(src, xAttrs); |
| } |
| |
| @Override // ClientProtocol |
| public List<XAttr> listXAttrs(String src) throws IOException { |
| return clientProto.listXAttrs(src); |
| } |
| |
| @Override // ClientProtocol |
| public void removeXAttr(String src, XAttr xAttr) throws IOException { |
| clientProto.removeXAttr(src, xAttr); |
| } |
| |
| @Override // ClientProtocol |
| public void checkAccess(String path, FsAction mode) throws IOException { |
| clientProto.checkAccess(path, mode); |
| } |
| |
| @Override // ClientProtocol |
| public long getCurrentEditLogTxid() throws IOException { |
| return clientProto.getCurrentEditLogTxid(); |
| } |
| |
| @Override // ClientProtocol |
| public EventBatchList getEditsFromTxid(long txid) throws IOException { |
| return clientProto.getEditsFromTxid(txid); |
| } |
| |
| @Override // ClientProtocol |
| public DataEncryptionKey getDataEncryptionKey() throws IOException { |
| return clientProto.getDataEncryptionKey(); |
| } |
| |
| @Override // ClientProtocol |
| public String createSnapshot(String snapshotRoot, String snapshotName) |
| throws IOException { |
| return clientProto.createSnapshot(snapshotRoot, snapshotName); |
| } |
| |
| @Override // ClientProtocol |
| public void deleteSnapshot(String snapshotRoot, String snapshotName) |
| throws IOException { |
| clientProto.deleteSnapshot(snapshotRoot, snapshotName); |
| } |
| |
| @Override // ClientProtocol |
| public void setQuota(String path, long namespaceQuota, long storagespaceQuota, |
| StorageType type) throws IOException { |
| clientProto.setQuota(path, namespaceQuota, storagespaceQuota, type); |
| } |
| |
| @Override // ClientProtocol |
| public QuotaUsage getQuotaUsage(String path) throws IOException { |
| return clientProto.getQuotaUsage(path); |
| } |
| |
| @Override // ClientProtocol |
| public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { |
| clientProto.reportBadBlocks(blocks); |
| } |
| |
| @Override // ClientProtocol |
| public void unsetStoragePolicy(String src) throws IOException { |
| clientProto.unsetStoragePolicy(src); |
| } |
| |
| @Override // ClientProtocol |
| public BlockStoragePolicy getStoragePolicy(String path) throws IOException { |
| return clientProto.getStoragePolicy(path); |
| } |
| |
| @Override // ClientProtocol |
| public ErasureCodingPolicyInfo[] getErasureCodingPolicies() |
| throws IOException { |
| return clientProto.getErasureCodingPolicies(); |
| } |
| |
| @Override // ClientProtocol |
| public Map<String, String> getErasureCodingCodecs() throws IOException { |
| return clientProto.getErasureCodingCodecs(); |
| } |
| |
| @Override // ClientProtocol |
| public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( |
| ErasureCodingPolicy[] policies) throws IOException { |
| return clientProto.addErasureCodingPolicies(policies); |
| } |
| |
| @Override // ClientProtocol |
| public void removeErasureCodingPolicy(String ecPolicyName) |
| throws IOException { |
| clientProto.removeErasureCodingPolicy(ecPolicyName); |
| } |
| |
| @Override // ClientProtocol |
| public void disableErasureCodingPolicy(String ecPolicyName) |
| throws IOException { |
| clientProto.disableErasureCodingPolicy(ecPolicyName); |
| } |
| |
| @Override // ClientProtocol |
| public void enableErasureCodingPolicy(String ecPolicyName) |
| throws IOException { |
| clientProto.enableErasureCodingPolicy(ecPolicyName); |
| } |
| |
| @Override // ClientProtocol |
| public ErasureCodingPolicy getErasureCodingPolicy(String src) |
| throws IOException { |
| return clientProto.getErasureCodingPolicy(src); |
| } |
| |
| @Override // ClientProtocol |
| public void setErasureCodingPolicy(String src, String ecPolicyName) |
| throws IOException { |
| clientProto.setErasureCodingPolicy(src, ecPolicyName); |
| } |
| |
| @Override // ClientProtocol |
| public void unsetErasureCodingPolicy(String src) throws IOException { |
| clientProto.unsetErasureCodingPolicy(src); |
| } |
| |
| @Override |
| public ECTopologyVerifierResult getECTopologyResultForPolicies( |
| String... policyNames) throws IOException { |
| return clientProto.getECTopologyResultForPolicies(policyNames); |
| } |
| |
| @Override // ClientProtocol |
| public ECBlockGroupStats getECBlockGroupStats() throws IOException { |
| return clientProto.getECBlockGroupStats(); |
| } |
| |
| @Override // ClientProtocol |
| public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { |
| return clientProto.getReplicatedBlockStats(); |
| } |
| |
| @Deprecated |
| @Override // ClientProtocol |
| public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) |
| throws IOException { |
| return clientProto.listOpenFiles(prevId); |
| } |
| |
| @Override // ClientProtocol |
| public HAServiceProtocol.HAServiceState getHAServiceState() |
| throws IOException { |
| return clientProto.getHAServiceState(); |
| } |
| |
| @Override // ClientProtocol |
| public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, |
| EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException { |
| return clientProto.listOpenFiles(prevId, openFilesTypes, path); |
| } |
| |
| @Override // ClientProtocol |
| public void msync() throws IOException { |
| clientProto.msync(); |
| } |
| |
| @Override // ClientProtocol |
| public void satisfyStoragePolicy(String path) throws IOException { |
| clientProto.satisfyStoragePolicy(path); |
| } |
| |
| @Override // ClientProtocol |
| public DatanodeInfo[] getSlowDatanodeReport() throws IOException { |
| return clientProto.getSlowDatanodeReport(); |
| } |
| |
| @Override // NamenodeProtocol |
| public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, |
| long minBlockSize, long hotBlockTimeInterval) throws IOException { |
| return nnProto.getBlocks(datanode, size, minBlockSize, |
| hotBlockTimeInterval); |
| } |
| |
| @Override // NamenodeProtocol |
| public ExportedBlockKeys getBlockKeys() throws IOException { |
| return nnProto.getBlockKeys(); |
| } |
| |
| @Override // NamenodeProtocol |
| public long getTransactionID() throws IOException { |
| return nnProto.getTransactionID(); |
| } |
| |
| @Override // NamenodeProtocol |
| public long getMostRecentCheckpointTxId() throws IOException { |
| return nnProto.getMostRecentCheckpointTxId(); |
| } |
| |
| @Override // NamenodeProtocol |
| public CheckpointSignature rollEditLog() throws IOException { |
| return nnProto.rollEditLog(); |
| } |
| |
| @Override // NamenodeProtocol |
| public NamespaceInfo versionRequest() throws IOException { |
| return nnProto.versionRequest(); |
| } |
| |
| @Override // NamenodeProtocol |
| public void errorReport(NamenodeRegistration registration, int errorCode, |
| String msg) throws IOException { |
| nnProto.errorReport(registration, errorCode, msg); |
| } |
| |
| @Override // NamenodeProtocol |
| public NamenodeRegistration registerSubordinateNamenode( |
| NamenodeRegistration registration) throws IOException { |
| return nnProto.registerSubordinateNamenode(registration); |
| } |
| |
| @Override // NamenodeProtocol |
| public NamenodeCommand startCheckpoint(NamenodeRegistration registration) |
| throws IOException { |
| return nnProto.startCheckpoint(registration); |
| } |
| |
| @Override // NamenodeProtocol |
| public void endCheckpoint(NamenodeRegistration registration, |
| CheckpointSignature sig) throws IOException { |
| nnProto.endCheckpoint(registration, sig); |
| } |
| |
| @Override // NamenodeProtocol |
| public RemoteEditLogManifest getEditLogManifest(long sinceTxId) |
| throws IOException { |
| return nnProto.getEditLogManifest(sinceTxId); |
| } |
| |
| @Override // NamenodeProtocol |
| public boolean isUpgradeFinalized() throws IOException { |
| return nnProto.isUpgradeFinalized(); |
| } |
| |
| @Override // NamenodeProtocol |
| public boolean isRollingUpgrade() throws IOException { |
| return nnProto.isRollingUpgrade(); |
| } |
| |
| @Override // NamenodeProtocol |
| public Long getNextSPSPath() throws IOException { |
| return nnProto.getNextSPSPath(); |
| } |
| |
| /** |
| * Locate the location with the matching block pool id. |
| * |
| * @param path Path to check. |
| * @param failIfLocked Fail the request if locked (top mount point). |
| * @param blockPoolId The block pool ID of the namespace to search for. |
| * @return Prioritized list of locations in the federated cluster. |
| * @throws IOException if the location for this path cannot be determined. |
| */ |
| protected RemoteLocation getLocationForPath( |
| String path, boolean failIfLocked, String blockPoolId) |
| throws IOException { |
| |
| final List<RemoteLocation> locations = |
| getLocationsForPath(path, failIfLocked); |
| |
| String nameserviceId = null; |
| Set<FederationNamespaceInfo> namespaces = |
| this.namenodeResolver.getNamespaces(); |
| for (FederationNamespaceInfo namespace : namespaces) { |
| if (namespace.getBlockPoolId().equals(blockPoolId)) { |
| nameserviceId = namespace.getNameserviceId(); |
| break; |
| } |
| } |
| if (nameserviceId != null) { |
| for (RemoteLocation location : locations) { |
| if (location.getNameserviceId().equals(nameserviceId)) { |
| return location; |
| } |
| } |
| } |
| throw new IOException( |
| "Cannot locate a nameservice for block pool " + blockPoolId); |
| } |
| |
| /** |
| * Get the possible locations of a path in the federated cluster. |
| * During the get operation, it will do the quota verification. |
| * |
| * @param path Path to check. |
| * @param failIfLocked Fail the request if locked (top mount point). |
| * @return Prioritized list of locations in the federated cluster. |
| * @throws IOException If the location for this path cannot be determined. |
| */ |
| protected List<RemoteLocation> getLocationsForPath(String path, |
| boolean failIfLocked) throws IOException { |
| return getLocationsForPath(path, failIfLocked, true); |
| } |
| |
| /** |
| * Get the possible locations of a path in the federated cluster. |
| * |
| * @param path Path to check. |
| * @param failIfLocked Fail the request if there is any mount point under |
| * the path. |
| * @param needQuotaVerify If need to do the quota verification. |
| * @return Prioritized list of locations in the federated cluster. |
| * @throws IOException If the location for this path cannot be determined. |
| */ |
| protected List<RemoteLocation> getLocationsForPath(String path, |
| boolean failIfLocked, boolean needQuotaVerify) throws IOException { |
| try { |
| if (failIfLocked) { |
| // check if there is any mount point under the path |
| final List<String> mountPoints = |
| this.subclusterResolver.getMountPoints(path); |
| if (mountPoints != null) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("The operation is not allowed because "); |
| if (mountPoints.isEmpty()) { |
| sb.append("the path: ") |
| .append(path) |
| .append(" is a mount point"); |
| } else { |
| sb.append("there are mount points: ") |
| .append(String.join(",", mountPoints)) |
| .append(" under the path: ") |
| .append(path); |
| } |
| throw new AccessControlException(sb.toString()); |
| } |
| } |
| |
| // Check the location for this path |
| final PathLocation location = |
| this.subclusterResolver.getDestinationForPath(path); |
| if (location == null) { |
| throw new NoLocationException(path, this.subclusterResolver.getClass()); |
| } |
| |
| // We may block some write operations |
| if (opCategory.get() == OperationCategory.WRITE) { |
| // Check if the path is in a read only mount point |
| if (isPathReadOnly(path)) { |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.routerFailureReadOnly(); |
| } |
| throw new IOException(path + " is in a read only mount point"); |
| } |
| |
| // Check quota |
| if (this.router.isQuotaEnabled() && needQuotaVerify) { |
| RouterQuotaUsage quotaUsage = this.router.getQuotaManager() |
| .getQuotaUsage(path); |
| if (quotaUsage != null) { |
| quotaUsage.verifyNamespaceQuota(); |
| quotaUsage.verifyStoragespaceQuota(); |
| quotaUsage.verifyQuotaByStorageType(); |
| } |
| } |
| } |
| |
| // Filter disabled subclusters |
| Set<String> disabled = namenodeResolver.getDisabledNamespaces(); |
| List<RemoteLocation> locs = new ArrayList<>(); |
| for (RemoteLocation loc : location.getDestinations()) { |
| if (!disabled.contains(loc.getNameserviceId())) { |
| locs.add(loc); |
| } |
| } |
| if (locs.isEmpty()) { |
| throw new NoLocationException(path, this.subclusterResolver.getClass()); |
| } |
| return locs; |
| } catch (IOException ioe) { |
| if (this.rpcMonitor != null) { |
| this.rpcMonitor.routerFailureStateStore(); |
| } |
| if (ioe instanceof StateStoreUnavailableException) { |
| checkSafeMode(); |
| } |
| throw ioe; |
| } |
| } |
| |
| /** |
| * Check if a path is in a read only mount point. |
| * |
| * @param path Path to check. |
| * @return If the path is in a read only mount point. |
| */ |
| private boolean isPathReadOnly(final String path) { |
| if (subclusterResolver instanceof MountTableResolver) { |
| try { |
| MountTableResolver mountTable = (MountTableResolver)subclusterResolver; |
| MountTable entry = mountTable.getMountPoint(path); |
| if (entry != null && entry.isReadOnly()) { |
| return true; |
| } |
| } catch (IOException e) { |
| LOG.error("Cannot get mount point", e); |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Get the user that is invoking this operation. |
| * |
| * @return Remote user group information. |
| * @throws IOException If we cannot get the user information. |
| */ |
| public static UserGroupInformation getRemoteUser() throws IOException { |
| UserGroupInformation ugi = CUR_USER.get(); |
| ugi = (ugi != null) ? ugi : Server.getRemoteUser(); |
| return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); |
| } |
| |
| /** |
| * Set super user credentials if needed. |
| */ |
| static void setCurrentUser(UserGroupInformation ugi) { |
| CUR_USER.set(ugi); |
| } |
| |
| /** |
| * Reset to discard super user credentials. |
| */ |
| static void resetCurrentUser() { |
| CUR_USER.set(null); |
| } |
| |
| /** |
| * Merge the outputs from multiple namespaces. |
| * |
| * @param <T> The type of the objects to merge. |
| * @param map Namespace to Output array. |
| * @param clazz Class of the values. |
| * @return Array with the outputs. |
| */ |
| static <T> T[] merge( |
| Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) { |
| |
| // Put all results into a set to avoid repeats |
| Set<T> ret = new LinkedHashSet<>(); |
| for (T[] values : map.values()) { |
| if (values != null) { |
| for (T val : values) { |
| ret.add(val); |
| } |
| } |
| } |
| |
| return toArray(ret, clazz); |
| } |
| |
| /** |
| * Convert a set of values into an array. |
| * @param <T> The type of the return objects. |
| * @param set Input set. |
| * @param clazz Class of the values. |
| * @return Array with the values in set. |
| */ |
| static <T> T[] toArray(Collection<T> set, Class<T> clazz) { |
| @SuppressWarnings("unchecked") |
| T[] combinedData = (T[]) Array.newInstance(clazz, set.size()); |
| combinedData = set.toArray(combinedData); |
| return combinedData; |
| } |
| |
| /** |
| * Get quota module implementation. |
| * @return Quota module implementation |
| */ |
| public Quota getQuotaModule() { |
| return this.quotaCall; |
| } |
| |
| /** |
| * Get ClientProtocol module implementation. |
| * @return ClientProtocol implementation |
| */ |
| @VisibleForTesting |
| public RouterClientProtocol getClientProtocolModule() { |
| return this.clientProto; |
| } |
| |
| /** |
| * Get RPC metrics info. |
| * @return The instance of FederationRPCMetrics. |
| */ |
| public FederationRPCMetrics getRPCMetrics() { |
| return this.rpcMonitor.getRPCMetrics(); |
| } |
| |
| /** |
| * Check if a path should be in all subclusters. |
| * |
| * @param path Path to check. |
| * @return If a path should be in all subclusters. |
| */ |
| boolean isPathAll(final String path) { |
| if (subclusterResolver instanceof MountTableResolver) { |
| try { |
| MountTableResolver mountTable = (MountTableResolver) subclusterResolver; |
| MountTable entry = mountTable.getMountPoint(path); |
| if (entry != null) { |
| return entry.isAll(); |
| } |
| } catch (IOException e) { |
| LOG.error("Cannot get mount point", e); |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Check if a path supports failed subclusters. |
| * |
| * @param path Path to check. |
| * @return If a path should support failed subclusters. |
| */ |
| boolean isPathFaultTolerant(final String path) { |
| if (subclusterResolver instanceof MountTableResolver) { |
| try { |
| MountTableResolver mountTable = (MountTableResolver) subclusterResolver; |
| MountTable entry = mountTable.getMountPoint(path); |
| if (entry != null) { |
| return entry.isFaultTolerant(); |
| } |
| } catch (IOException e) { |
| LOG.error("Cannot get mount point", e); |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Check if call needs to be invoked to all the locations. The call is |
| * supposed to be invoked in all the locations in case the order of the mount |
| * entry is amongst HASH_ALL, RANDOM or SPACE or if the source is itself a |
| * mount entry. |
| * @param path The path on which the operation need to be invoked. |
| * @return true if the call is supposed to invoked on all locations. |
| * @throws IOException |
| */ |
| boolean isInvokeConcurrent(final String path) throws IOException { |
| if (subclusterResolver instanceof MountTableResolver) { |
| MountTableResolver mountTableResolver = |
| (MountTableResolver) subclusterResolver; |
| List<String> mountPoints = mountTableResolver.getMountPoints(path); |
| // If this is a mount point, we need to invoke everywhere. |
| if (mountPoints != null) { |
| return true; |
| } |
| return isPathAll(path); |
| } |
| return false; |
| } |
| |
| @Override |
| public void refreshUserToGroupsMappings() throws IOException { |
| routerProto.refreshUserToGroupsMappings(); |
| } |
| |
| @Override |
| public void refreshSuperUserGroupsConfiguration() throws IOException { |
| routerProto.refreshSuperUserGroupsConfiguration(); |
| } |
| |
| @Override |
| public String[] getGroupsForUser(String user) throws IOException { |
| return routerProto.getGroupsForUser(user); |
| } |
| |
| public int getRouterFederationRenameCount() { |
| return clientProto.getRouterFederationRenameCount(); |
| } |
| |
| public int getSchedulerJobCount() { |
| if (fedRenameScheduler == null) { |
| return 0; |
| } |
| return fedRenameScheduler.getAllJobs().size(); |
| } |
| |
| public String refreshFairnessPolicyController() { |
| return rpcClient.refreshFairnessPolicyController(new Configuration()); |
| } |
| |
| /** |
| * Get the slow running datanodes report with a timeout. |
| * |
| * @param requireResponse If we require all the namespaces to report. |
| * @param timeOutMs Time out for the reply in milliseconds. |
| * @return List of datanodes. |
| * @throws IOException If it cannot get the report. |
| */ |
| public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs) |
| throws IOException { |
| checkOperation(OperationCategory.UNCHECKED); |
| |
| Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>(); |
| RemoteMethod method = new RemoteMethod("getSlowDatanodeReport"); |
| |
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); |
| Map<FederationNamespaceInfo, DatanodeInfo[]> results = |
| rpcClient.invokeConcurrent(nss, method, requireResponse, false, |
| timeOutMs, DatanodeInfo[].class); |
| updateDnMap(results, datanodesMap); |
| // Map -> Array |
| Collection<DatanodeInfo> datanodes = datanodesMap.values(); |
| return toArray(datanodes, DatanodeInfo.class); |
| } |
| |
| private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results, |
| Map<String, DatanodeInfo> datanodesMap) { |
| for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry : |
| results.entrySet()) { |
| FederationNamespaceInfo ns = entry.getKey(); |
| DatanodeInfo[] result = entry.getValue(); |
| for (DatanodeInfo node : result) { |
| String nodeId = node.getXferAddr(); |
| DatanodeInfo dn = datanodesMap.get(nodeId); |
| if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) { |
| // Add the subcluster as a suffix to the network location |
| node.setNetworkLocation( |
| NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() + |
| node.getNetworkLocation()); |
| datanodesMap.put(nodeId, node); |
| } else { |
| LOG.debug("{} is in multiple subclusters", nodeId); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Deals with loading datanode report into the cache and refresh. |
| */ |
| private class DatanodeReportCacheLoader |
| extends CacheLoader<DatanodeReportType, DatanodeInfo[]> { |
| |
| private ListeningExecutorService executorService; |
| |
| DatanodeReportCacheLoader() { |
| ThreadFactory threadFactory = new ThreadFactoryBuilder() |
| .setNameFormat("DatanodeReport-Cache-Reload") |
| .setDaemon(true) |
| .build(); |
| |
| executorService = MoreExecutors.listeningDecorator( |
| Executors.newSingleThreadExecutor(threadFactory)); |
| } |
| |
| @Override |
| public DatanodeInfo[] load(DatanodeReportType type) throws Exception { |
| return getCachedDatanodeReportImpl(type); |
| } |
| |
| /** |
| * Override the reload method to provide an asynchronous implementation, |
| * so that the query will not be slowed down by the cache refresh. It |
| * will return the old cache value and schedule a background refresh. |
| */ |
| @Override |
| public ListenableFuture<DatanodeInfo[]> reload( |
| final DatanodeReportType type, DatanodeInfo[] oldValue) |
| throws Exception { |
| return executorService.submit(() -> load(type)); |
| } |
| } |
| } |