blob: 972f4514bb0c65c71a06a4511ad71ff5b1921f15 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
import static org.apache.hadoop.util.Time.now;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import com.google.common.collect.Lists;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshAuthorizationPolicyProtocolService;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import javax.annotation.Nonnull;
/**
* This class is responsible for handling all of the RPC calls to the NameNode.
* It is created, started, and stopped by {@link NameNode}.
*/
@InterfaceAudience.Private
@VisibleForTesting
public class NameNodeRpcServer implements NamenodeProtocols {
private static final Logger LOG = NameNode.LOG;
private static final Logger stateChangeLog = NameNode.stateChangeLog;
private static final Logger blockStateChangeLog = NameNode
.blockStateChangeLog;
// Dependencies from other parts of NN.
protected final FSNamesystem namesystem;
protected final NameNode nn;
private final NameNodeMetrics metrics;
private final RetryCache retryCache;
private final boolean serviceAuthEnabled;
/** The RPC server that listens to requests from DataNodes */
private final RPC.Server serviceRpcServer;
private final InetSocketAddress serviceRPCAddress;
/** The RPC server that listens to lifeline requests */
private final RPC.Server lifelineRpcServer;
private final InetSocketAddress lifelineRPCAddress;
/** The RPC server that listens to requests from clients */
protected final RPC.Server clientRpcServer;
protected final InetSocketAddress clientRpcAddress;
private final String minimumDataNodeVersion;
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
this.namesystem = nn.getNamesystem();
this.retryCache = namesystem.getRetryCache();
this.metrics = NameNode.getNameNodeMetrics();
int handlerCount =
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);
int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength);
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator =
new DatanodeLifelineProtocolServerSideTranslatorPB(this);
BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService
.newReflectiveBlockingService(lifelineProtoPbTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService NNPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
.newReflectiveBlockingService(refreshAuthPolicyXlator);
RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
.newReflectiveBlockingService(refreshUserMappingXlator);
RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator =
new RefreshCallQueueProtocolServerSideTranslatorPB(this);
BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
.newReflectiveBlockingService(refreshCallQueueXlator);
GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
new GenericRefreshProtocolServerSideTranslatorPB(this);
BlockingService genericRefreshService = GenericRefreshProtocolService
.newReflectiveBlockingService(genericRefreshXlator);
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
new GetUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService getUserMappingService = GetUserMappingsProtocolService
.newReflectiveBlockingService(getUserMappingXlator);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
= new ReconfigurationProtocolServerSideTranslatorPB(this);
BlockingService reconfigurationPbService = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
WritableRpcEngine.ensureInitialized();
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) {
String bindHost = nn.getServiceRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = serviceRpcAddr.getHostName();
}
LOG.info("Service RPC server is binding to " + bindHost + ":" +
serviceRpcAddr.getPort());
int serviceHandlerCount =
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this.serviceRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
// Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, serviceRpcServer);
// We support Refreshing call queue here in case the client RPC queue is full
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
traceAdminService, serviceRpcServer);
// Update the address with the correct port
InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
serviceRPCAddress = new InetSocketAddress(
serviceRpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
} else {
serviceRpcServer = null;
serviceRPCAddress = null;
}
InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
if (lifelineRpcAddr != null) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
String bindHost = nn.getLifelineRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = lifelineRpcAddr.getHostName();
}
LOG.info("Lifeline RPC server is binding to {}:{}", bindHost,
lifelineRpcAddr.getPort());
int lifelineHandlerCount = conf.getInt(
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0);
if (lifelineHandlerCount <= 0) {
float lifelineHandlerRatio = conf.getFloat(
DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY,
DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT);
lifelineHandlerCount = Math.max(
(int)(handlerCount * lifelineHandlerRatio), 1);
}
lifelineRpcServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(bindHost)
.setPort(lifelineRpcAddr.getPort())
.setNumHandlers(lifelineHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class,
lifelineProtoPbService, lifelineRpcServer);
// Update the address with the correct port
InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
listenAddr.getPort());
nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
} else {
lifelineRpcServer = null;
lifelineRPCAddress = null;
}
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
String bindHost = nn.getRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = rpcAddr.getHostName();
}
LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());
boolean enableStateContext = conf.getBoolean(
DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY,
DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT);
LOG.info("Enable NameNode state context:" + enableStateContext);
GlobalStateIdContext stateIdContext = null;
if (enableStateContext) {
stateIdContext = new GlobalStateIdContext((namesystem));
}
this.clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService).setBindAddress(bindHost)
.setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.setAlignmentContext(stateIdContext)
.build();
// Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, clientRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, clientRpcServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, clientRpcServer);
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, clientRpcServer);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
traceAdminService, clientRpcServer);
// set service-level authorization security policy
if (serviceAuthEnabled =
conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
if (serviceRpcServer != null) {
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
if (lifelineRpcServer != null) {
lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
// The rpc-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
clientRpcAddress = new InetSocketAddress(
rpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServerAddress(conf, clientRpcAddress);
minimumDataNodeVersion = conf.get(
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
// Set terse exception whose stack trace won't be logged
this.clientRpcServer.addTerseExceptions(SafeModeException.class,
FileNotFoundException.class,
HadoopIllegalArgumentException.class,
FileAlreadyExistsException.class,
InvalidPathException.class,
ParentNotDirectoryException.class,
UnresolvedLinkException.class,
AlreadyBeingCreatedException.class,
QuotaExceededException.class,
RecoveryInProgressException.class,
AccessControlException.class,
InvalidToken.class,
LeaseExpiredException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
QuotaByStorageTypeExceededException.class,
AclException.class,
FSLimitException.PathComponentTooLongException.class,
FSLimitException.MaxDirectoryItemsExceededException.class);
clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class,
UnresolvedPathException.class);
clientRpcServer.setTracer(nn.tracer);
if (serviceRpcServer != null) {
serviceRpcServer.setTracer(nn.tracer);
}
if (lifelineRpcServer != null) {
lifelineRpcServer.setTracer(nn.tracer);
}
int[] auxiliaryPorts =
conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
for (int auxiliaryPort : auxiliaryPorts) {
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
}
}
}
/** Allow access to the lifeline RPC server for testing */
@VisibleForTesting
RPC.Server getLifelineRpcServer() {
return lifelineRpcServer;
}
/** Allow access to the client RPC server for testing */
@VisibleForTesting
public RPC.Server getClientRpcServer() {
return clientRpcServer;
}
/** Allow access to the service RPC server for testing */
@VisibleForTesting
RPC.Server getServiceRpcServer() {
return serviceRpcServer;
}
/**
* Start client and service RPC servers.
*/
void start() {
clientRpcServer.start();
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
if (lifelineRpcServer != null) {
lifelineRpcServer.start();
}
}
/**
* Wait until the RPC servers have shutdown.
*/
void join() throws InterruptedException {
clientRpcServer.join();
if (serviceRpcServer != null) {
serviceRpcServer.join();
}
if (lifelineRpcServer != null) {
lifelineRpcServer.join();
}
}
/**
* Stop client and service RPC servers.
*/
void stop() {
if (clientRpcServer != null) {
clientRpcServer.stop();
}
if (serviceRpcServer != null) {
serviceRpcServer.stop();
}
if (lifelineRpcServer != null) {
lifelineRpcServer.stop();
}
}
InetSocketAddress getLifelineRpcAddress() {
return lifelineRPCAddress;
}
InetSocketAddress getServiceRpcAddress() {
return serviceRPCAddress;
}
@VisibleForTesting
public InetSocketAddress getRpcAddress() {
return clientRpcAddress;
}
@VisibleForTesting
public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
return clientRpcServer.getAuxiliaryListenerAddresses();
}
private static UserGroupInformation getRemoteUser() throws IOException {
return NameNode.getRemoteUser();
}
/////////////////////////////////////////////////////
// NamenodeProtocol
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
if(size <= 0) {
throw new IllegalArgumentException(
"Unexpected not positive size: "+size);
}
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.getBlocks(datanode, size);
}
@Override // NamenodeProtocol
public ExportedBlockKeys getBlockKeys() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.getBlockManager().getBlockKeys();
}
@Override // NamenodeProtocol
public void errorReport(NamenodeRegistration registration,
int errorCode,
String msg) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.UNCHECKED);
namesystem.checkSuperuserPrivilege();
verifyRequest(registration);
LOG.info("Error report from " + registration + ": " + msg);
if (errorCode == FATAL) {
namesystem.releaseBackupNode(registration);
}
}
@Override // NamenodeProtocol
public NamenodeRegistration registerSubordinateNamenode(
NamenodeRegistration registration) throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
verifyLayoutVersion(registration.getVersion());
NamenodeRegistration myRegistration = nn.setRegistration();
namesystem.registerBackupNode(registration, myRegistration);
return myRegistration;
}
@Override // NamenodeProtocol
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
verifyRequest(registration);
if(!nn.isRole(NamenodeRole.NAMENODE))
throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (NamenodeCommand) cacheEntry.getPayload();
}
NamenodeCommand ret = null;
try {
ret = namesystem.startCheckpoint(registration, nn.setRegistration());
} finally {
RetryCache.setState(cacheEntry, ret != null, ret);
}
return ret;
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.endCheckpoint(registration, sig);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
checkNNStartup();
return namesystem.getDelegationToken(renewer);
}
@Override // ClientProtocol
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
checkNNStartup();
return namesystem.renewDelegationToken(token);
}
@Override // ClientProtocol
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
checkNNStartup();
namesystem.cancelDelegationToken(token);
}
@Override // ClientProtocol
public LocatedBlocks getBlockLocations(String src,
long offset,
long length)
throws IOException {
checkNNStartup();
metrics.incrGetBlockLocations();
LocatedBlocks locatedBlocks =
namesystem.getBlockLocations(getClientMachine(), src, offset, length);
return locatedBlocks;
}
@Override // ClientProtocol
public FsServerDefaults getServerDefaults() throws IOException {
checkNNStartup();
return namesystem.getServerDefaults();
}
@Override // ClientProtocol
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
+src+" for "+clientName+" at "+clientMachine);
}
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
}
HdfsFileStatus status = null;
try {
PermissionStatus perm = new PermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions,
cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
return status;
}
@Override // ClientProtocol
public LastBlockWithStatus append(String src, String clientName,
EnumSetWritable<CreateFlag> flag) throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LastBlockWithStatus) cacheEntry.getPayload();
}
LastBlockWithStatus info = null;
boolean success = false;
try {
info = namesystem.appendFile(src, clientName, clientMachine, flag.get(),
cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success, info);
}
metrics.incrFilesAppended();
return info;
}
@Override // ClientProtocol
public boolean recoverLease(String src, String clientName) throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
return namesystem.recoverLease(src, clientName, clientMachine);
}
@Override // ClientProtocol
public boolean setReplication(String src, short replication)
throws IOException {
checkNNStartup();
return namesystem.setReplication(src, replication);
}
@Override
public void unsetStoragePolicy(String src)
throws IOException {
checkNNStartup();
namesystem.unsetStoragePolicy(src);
}
@Override
public void setStoragePolicy(String src, String policyName)
throws IOException {
checkNNStartup();
namesystem.setStoragePolicy(src, policyName);
}
@Override
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
checkNNStartup();
return namesystem.getStoragePolicy(path);
}
@Override
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
checkNNStartup();
return namesystem.getStoragePolicies();
}
@Override // ClientProtocol
public void setPermission(String src, FsPermission permissions)
throws IOException {
checkNNStartup();
namesystem.setPermission(src, permissions);
}
@Override // ClientProtocol
public void setOwner(String src, String username, String groupname)
throws IOException {
checkNNStartup();
namesystem.setOwner(src, username, groupname);
}
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException {
checkNNStartup();
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
return locatedBlock;
}
@Override // ClientProtocol
public LocatedBlock getAdditionalDatanode(final String src,
final long fileId, final ExtendedBlock blk,
final DatanodeInfo[] existings, final String[] existingStorageIDs,
final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
checkNNStartup();
if (LOG.isDebugEnabled()) {
LOG.debug("getAdditionalDatanode: src=" + src
+ ", fileId=" + fileId
+ ", blk=" + blk
+ ", existings=" + Arrays.asList(existings)
+ ", excludes=" + Arrays.asList(excludes)
+ ", numAdditionalNodes=" + numAdditionalNodes
+ ", clientName=" + clientName);
}
metrics.incrGetAdditionalDatanodeOps();
Set<Node> excludeSet = null;
if (excludes != null) {
excludeSet = new HashSet<Node>(excludes.length);
for (Node node : excludes) {
excludeSet.add(node);
}
}
LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
clientName);
return locatedBlock;
}
/**
* The client needs to give up on the block.
*/
@Override // ClientProtocol
public void abandonBlock(ExtendedBlock b, long fileId, String src,
String holder) throws IOException {
checkNNStartup();
namesystem.abandonBlock(b, fileId, src, holder);
}
@Override // ClientProtocol
public boolean complete(String src, String clientName,
ExtendedBlock last, long fileId)
throws IOException {
checkNNStartup();
return namesystem.completeFile(src, clientName, last, fileId);
}
/**
* The client has detected an error on the specified located blocks
* and is reporting them to the server. For now, the namenode will
* mark the block as corrupt. In the future we might
* check the blocks are actually corrupt.
*/
@Override // ClientProtocol, DatanodeProtocol
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
checkNNStartup();
namesystem.reportBadBlocks(blocks);
}
@Override // ClientProtocol
public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
throws IOException {
checkNNStartup();
return namesystem.updateBlockForPipeline(block, clientName);
}
@Override // ClientProtocol
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes,
newStorageIDs, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // DatanodeProtocol
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages)
throws IOException {
checkNNStartup();
namesystem.commitBlockSynchronization(block, newgenerationstamp,
newlength, closeFile, deleteblock, newtargets, newtargetstorages);
}
@Override // ClientProtocol
public long getPreferredBlockSize(String filename)
throws IOException {
checkNNStartup();
return namesystem.getPreferredBlockSize(filename);
}
@Deprecated
@Override // ClientProtocol
public boolean rename(String src, String dst) throws IOException {
checkNNStartup();
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
}
if (!checkPathLength(dst)) {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
ret = namesystem.renameTo(src, dst, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, ret);
}
if (ret) {
metrics.incrFilesRenamed();
}
return ret;
}
@Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.concat(trg, src, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public void rename2(String src, String dst, Options.Rename... options)
throws IOException {
checkNNStartup();
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
}
if (!checkPathLength(dst)) {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.renameTo(src, dst, cacheEntry != null, options);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
metrics.incrFilesRenamed();
}
@Override // ClientProtocol
public boolean truncate(String src, long newLength, String clientName)
throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.truncate: " + src + " to " +
newLength);
}
String clientMachine = getClientMachine();
try {
return namesystem.truncate(
src, newLength, clientName, clientMachine, now());
} finally {
metrics.incrFilesTruncated();
}
}
@Override // ClientProtocol
public boolean delete(String src, boolean recursive) throws IOException {
checkNNStartup();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive);
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
ret = namesystem.delete(src, recursive, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, ret);
}
if (ret)
metrics.incrDeleteFileOps();
return ret;
}
/**
* Check path length does not exceed maximum. Returns true if
* length and depth are okay. Returns false if length is too long
* or depth is too great.
*/
private boolean checkPathLength(String src) {
Path srcPath = new Path(src);
return (src.length() <= MAX_PATH_LENGTH &&
srcPath.depth() <= MAX_PATH_DEPTH);
}
@Override // ClientProtocol
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
throws IOException {
checkNNStartup();
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
}
if (!checkPathLength(src)) {
throw new IOException("mkdirs: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
return namesystem.mkdirs(src,
new PermissionStatus(getRemoteUser().getShortUserName(),
null, masked), createParent);
}
@Override // ClientProtocol
public void renewLease(String clientName) throws IOException {
checkNNStartup();
namesystem.renewLease(clientName);
}
@Override // ClientProtocol
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws IOException {
checkNNStartup();
DirectoryListing files = namesystem.getListing(
src, startAfter, needLocation);
if (files != null) {
metrics.incrGetListingOps();
metrics.incrFilesInGetListingOps(files.getPartialListing().length);
}
return files;
}
@Override // ClientProtocol
public HdfsFileStatus getFileInfo(String src) throws IOException {
checkNNStartup();
metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, true);
}
@Override // ClientProtocol
public boolean isFileClosed(String src) throws IOException{
checkNNStartup();
return namesystem.isFileClosed(src);
}
@Override // ClientProtocol
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
checkNNStartup();
metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, false);
}
@Override // ClientProtocol
public long[] getStats() throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ);
return namesystem.getStats();
}
@Override // ClientProtocol
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
checkNNStartup();
DatanodeInfo results[] = namesystem.datanodeReport(type);
return results;
}
@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
checkNNStartup();
final DatanodeStorageReport[] reports = namesystem.getDatanodeStorageReport(type);
return reports;
}
@Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action, boolean isChecked)
throws IOException {
checkNNStartup();
OperationCategory opCategory = OperationCategory.UNCHECKED;
if (isChecked) {
if (action == SafeModeAction.SAFEMODE_GET) {
opCategory = OperationCategory.READ;
} else {
opCategory = OperationCategory.WRITE;
}
}
namesystem.checkOperation(opCategory);
return namesystem.setSafeMode(action);
}
@Override // ClientProtocol
public boolean restoreFailedStorage(String arg) throws IOException {
checkNNStartup();
return namesystem.restoreFailedStorage(arg);
}
@Override // ClientProtocol
public void saveNamespace() throws IOException {
checkNNStartup();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.saveNamespace();
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public long rollEdits() throws AccessControlException, IOException {
checkNNStartup();
CheckpointSignature sig = namesystem.rollEditLog();
return sig.getCurSegmentTxId();
}
@Override // ClientProtocol
public void refreshNodes() throws IOException {
checkNNStartup();
namesystem.refreshNodes();
}
@Override // NamenodeProtocol
public long getTransactionID() throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.UNCHECKED);
namesystem.checkSuperuserPrivilege();
return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
}
@Override // NamenodeProtocol
public long getMostRecentCheckpointTxId() throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.UNCHECKED);
namesystem.checkSuperuserPrivilege();
return namesystem.getFSImage().getMostRecentCheckpointTxId();
}
@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.rollEditLog();
}
@Override // NamenodeProtocol
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ);
namesystem.checkSuperuserPrivilege();
return namesystem.getEditLog().getEditLogManifest(sinceTxId);
}
@Override // NamenodeProtocol
public boolean isUpgradeFinalized() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.isUpgradeFinalized();
}
@Override // NamenodeProtocol
public boolean isRollingUpgrade() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.isRollingUpgrade();
}
@Override // ClientProtocol
public void finalizeUpgrade() throws IOException {
checkNNStartup();
namesystem.finalizeUpgrade();
}
@Override // ClientProtocol
public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException {
checkNNStartup();
LOG.info("rollingUpgrade " + action);
switch(action) {
case QUERY:
return namesystem.queryRollingUpgrade();
case PREPARE:
return namesystem.startRollingUpgrade();
case FINALIZE:
return namesystem.finalizeRollingUpgrade();
default:
throw new UnsupportedActionException(action + " is not yet supported.");
}
}
@Override // ClientProtocol
public void metaSave(String filename) throws IOException {
checkNNStartup();
namesystem.metaSave(filename);
}
@Override // ClientProtocol
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
throws IOException {
checkNNStartup();
return namesystem.listOpenFiles(prevId);
}
@Override // ClientProtocol
public void msync() throws IOException {
// Check for write access to ensure that msync only happens on active
namesystem.checkOperation(OperationCategory.WRITE);
}
@Override // ClientProtocol
public HAServiceState getHAServiceState() throws IOException {
checkNNStartup();
return nn.getServiceStatus().getState();
}
@Override // ClientProtocol
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException {
checkNNStartup();
String[] cookieTab = new String[] { cookie };
Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
namesystem.listCorruptFileBlocks(path, cookieTab);
String[] files = new String[fbs.size()];
int i = 0;
for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
files[i++] = fb.path;
}
return new CorruptFileBlocks(files, cookieTab[0]);
}
/**
* Tell all datanodes to use a new, non-persistent bandwidth value for
* dfs.datanode.balance.bandwidthPerSec.
* @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
* @throws IOException
*/
@Override // ClientProtocol
public void setBalancerBandwidth(long bandwidth) throws IOException {
checkNNStartup();
namesystem.setBalancerBandwidth(bandwidth);
}
@Override // ClientProtocol
public ContentSummary getContentSummary(String path) throws IOException {
checkNNStartup();
return namesystem.getContentSummary(path);
}
@Override // ClientProtocol
public QuotaUsage getQuotaUsage(String path) throws IOException {
checkNNStartup();
return namesystem.getQuotaUsage(path);
}
@Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
StorageType type)
throws IOException {
checkNNStartup();
namesystem.setQuota(path, namespaceQuota, storagespaceQuota, type);
}
@Override // ClientProtocol
public void fsync(String src, long fileId, String clientName,
long lastBlockLength)
throws IOException {
checkNNStartup();
namesystem.fsync(src, fileId, clientName, lastBlockLength);
}
@Override // ClientProtocol
public void setTimes(String src, long mtime, long atime)
throws IOException {
checkNNStartup();
namesystem.setTimes(src, mtime, atime);
}
@Override // ClientProtocol
public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system.
*/
if (!checkPathLength(link)) {
throw new IOException("Symlink path exceeds " + MAX_PATH_LENGTH +
" character limit");
}
final UserGroupInformation ugi = getRemoteUser();
boolean success = false;
try {
PermissionStatus perm = new PermissionStatus(ugi.getShortUserName(),
null, dirPerms);
namesystem.createSymlink(target, link, perm, createParent,
cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public String getLinkTarget(String path) throws IOException {
checkNNStartup();
metrics.incrGetLinkTargetOps();
HdfsFileStatus stat = null;
try {
stat = namesystem.getFileInfo(path, false);
} catch (UnresolvedPathException e) {
return e.getResolvedPath().toString();
} catch (UnresolvedLinkException e) {
// The NameNode should only throw an UnresolvedPathException
throw new AssertionError("UnresolvedLinkException thrown");
}
if (stat == null) {
throw new FileNotFoundException("File does not exist: " + path);
} else if (!stat.isSymlink()) {
throw new IOException("Path " + path + " is not a symbolic link");
}
return stat.getSymlink();
}
@Override // DatanodeProtocol
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
checkNNStartup();
verifySoftwareVersion(nodeReg);
namesystem.registerDatanode(nodeReg);
return nodeReg;
}
@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers, slowDisks);
}
@Override // DatanodeProtocol
public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
String poolId, final StorageBlockReport[] reports,
final BlockReportContext context) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) {
blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + nodeReg + ", reports.length=" + reports.length);
}
final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false;
try {
if (bm.checkBlockReportLease(context, nodeReg)) {
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks = reports[r].getBlocks();
//
// BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
final int index = r;
noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context);
}
});
metrics.incrStorageBlockReportOps();
}
}
} catch (UnregisteredNodeException une) {
LOG.debug("Datanode {} is attempting to report but not register yet.",
nodeReg);
return RegisterCommand.REGISTER;
}
bm.removeBRLeaseIfNeeded(nodeReg, context);
BlockManagerFaultInjector.getInstance().
incomingBlockReportRpc(nodeReg, context);
if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() &&
nn.isActiveState() &&
noStaleStorages) {
return new FinalizeCommand(poolId);
}
return null;
}
@Override
public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
String poolId, List<Long> blockIds) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
if (blockStateChangeLog.isDebugEnabled()) {
blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
+ "from " + nodeReg + " " + blockIds.size() + " blocks");
}
namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
return null;
}
@Override // DatanodeProtocol
public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
metrics.incrBlockReceivedAndDeletedOps();
if(blockStateChangeLog.isDebugEnabled()) {
blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
final BlockManager bm = namesystem.getBlockManager();
for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
bm.enqueueBlockOp(new Runnable() {
@Override
public void run() {
try {
namesystem.processIncrementalBlockReport(nodeReg, r);
} catch (Exception ex) {
// usually because the node is unregistered/dead. next heartbeat
// will correct the problem
blockStateChangeLog.error(
"*BLOCK* NameNode.blockReceivedAndDeleted: "
+ "failed from " + nodeReg + ": " + ex.getMessage());
}
}
});
}
}
@Override // DatanodeProtocol
public void errorReport(DatanodeRegistration nodeReg,
int errorCode, String msg) throws IOException {
checkNNStartup();
String dnName =
(nodeReg == null) ? "Unknown DataNode" : nodeReg.toString();
if (errorCode == DatanodeProtocol.NOTIFY) {
LOG.info("Error report from " + dnName + ": " + msg);
return;
}
verifyRequest(nodeReg);
if (errorCode == DatanodeProtocol.DISK_ERROR) {
LOG.warn("Disk error on " + dnName + ": " + msg);
} else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
LOG.warn("Fatal disk error on " + dnName + ": " + msg);
namesystem.getBlockManager().getDatanodeManager().removeDatanode(nodeReg);
} else {
LOG.info("Error report from " + dnName + ": " + msg);
}
}
@Override // DatanodeProtocol, NamenodeProtocol
public NamespaceInfo versionRequest() throws IOException {
checkNNStartup();
return namesystem.getNamespaceInfo();
}
@Override // DatanodeLifelineProtocol
public void sendLifeline(DatanodeRegistration nodeReg, StorageReport[] report,
long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
namesystem.handleLifeline(nodeReg, report, dnCacheCapacity, dnCacheUsed,
xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary);
}
/**
* Verifies the given registration.
*
* @param nodeReg node registration
* @throws UnregisteredNodeException if the registration is invalid
*/
private void verifyRequest(NodeRegistration nodeReg) throws IOException {
// verify registration ID
final String id = nodeReg.getRegistrationID();
final String expectedID = namesystem.getRegistrationID();
if (!expectedID.equals(id)) {
LOG.warn("Registration IDs mismatched: the "
+ nodeReg.getClass().getSimpleName() + " ID is " + id
+ " but the expected ID is " + expectedID);
throw new UnregisteredNodeException(nodeReg);
}
}
@Override // RefreshAuthorizationPolicyProtocol
public void refreshServiceAcl() throws IOException {
checkNNStartup();
if (!serviceAuthEnabled) {
throw new AuthorizationException("Service Level Authorization not enabled!");
}
this.clientRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
if (this.serviceRpcServer != null) {
this.serviceRpcServer.refreshServiceAcl(new Configuration(), new HDFSPolicyProvider());
}
}
@Override // RefreshAuthorizationPolicyProtocol
public void refreshUserToGroupsMappings() throws IOException {
LOG.info("Refreshing all user-to-groups mappings. Requested by user: " +
getRemoteUser().getShortUserName());
Groups.getUserToGroupsMappingService().refresh();
}
@Override // RefreshAuthorizationPolicyProtocol
public void refreshSuperUserGroupsConfiguration() {
LOG.info("Refreshing SuperUser proxy group mapping list ");
ProxyUsers.refreshSuperUserGroupsConfiguration();
}
@Override // RefreshCallQueueProtocol
public void refreshCallQueue() {
LOG.info("Refreshing call queue.");
Configuration conf = new Configuration();
clientRpcServer.refreshCallQueue(conf);
if (this.serviceRpcServer != null) {
serviceRpcServer.refreshCallQueue(conf);
}
}
@Override // GenericRefreshProtocol
public Collection<RefreshResponse> refresh(String identifier, String[] args) {
// Let the registry handle as needed
return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
}
@Override // GetUserMappingsProtocol
public String[] getGroupsForUser(String user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting groups for user " + user);
}
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
@Override // HAServiceProtocol
public synchronized void monitorHealth() throws HealthCheckFailedException,
AccessControlException, IOException {
checkNNStartup();
nn.monitorHealth();
}
@Override // HAServiceProtocol
public synchronized void transitionToActive(StateChangeRequestInfo req)
throws ServiceFailedException, AccessControlException, IOException {
checkNNStartup();
nn.checkHaStateChange(req);
nn.transitionToActive();
}
@Override // HAServiceProtocol
public synchronized void transitionToStandby(StateChangeRequestInfo req)
throws ServiceFailedException, AccessControlException, IOException {
checkNNStartup();
nn.checkHaStateChange(req);
nn.transitionToStandby();
}
@Override // HAServiceProtocol
public synchronized void transitionToObserver(StateChangeRequestInfo req)
throws ServiceFailedException, AccessControlException, IOException {
checkNNStartup();
nn.checkHaStateChange(req);
nn.transitionToObserver();
}
@Override // HAServiceProtocol
public synchronized HAServiceStatus getServiceStatus()
throws AccessControlException, ServiceFailedException, IOException {
checkNNStartup();
return nn.getServiceStatus();
}
/**
* Verify version.
* @param version layout version
* @throws IOException on layout version mismatch
*/
void verifyLayoutVersion(int version) throws IOException {
if (version != HdfsServerConstants.NAMENODE_LAYOUT_VERSION)
throw new IncorrectVersionException(
HdfsServerConstants.NAMENODE_LAYOUT_VERSION, version, "data node");
}
private void verifySoftwareVersion(DatanodeRegistration dnReg)
throws IncorrectVersionException {
String dnVersion = dnReg.getSoftwareVersion();
if (VersionUtil.compareVersions(dnVersion, minimumDataNodeVersion) < 0) {
IncorrectVersionException ive = new IncorrectVersionException(
minimumDataNodeVersion, dnVersion, "DataNode", "NameNode");
LOG.warn(ive.getMessage() + " DN: " + dnReg);
throw ive;
}
String nnVersion = VersionInfo.getVersion();
if (!dnVersion.equals(nnVersion)) {
String messagePrefix = "Reported DataNode version '" + dnVersion +
"' of DN " + dnReg + " does not match NameNode version '" +
nnVersion + "'";
long nnCTime = nn.getFSImage().getStorage().getCTime();
long dnCTime = dnReg.getStorageInfo().getCTime();
if (nnCTime != dnCTime) {
IncorrectVersionException ive = new IncorrectVersionException(
messagePrefix + " and CTime of DN ('" + dnCTime +
"') does not match CTime of NN ('" + nnCTime + "')");
LOG.warn(ive.toString(), ive);
throw ive;
} else {
LOG.info(messagePrefix +
". Note: This is normal during a rolling upgrade.");
}
}
}
private static String getClientMachine() {
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}
return clientMachine;
}
/**
* Return the QOP of the client that the current handler thread
* is handling. Assuming the negotiation is done at this point,
* otherwise returns null.
*
* @return the established QOP of this client.
*/
public static String getEstablishedClientQOP() {
return Server.getEstablishedQOP();
}
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
checkNNStartup();
return namesystem.getBlockManager().generateDataEncryptionKey();
}
@Override
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
checkNNStartup();
if (!checkPathLength(snapshotRoot)) {
throw new IOException("createSnapshot: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload();
}
metrics.incrCreateSnapshotOps();
String ret = null;
try {
ret = namesystem.createSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, ret != null, ret);
}
return ret;
}
@Override
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
checkNNStartup();
if (snapshotName == null || snapshotName.isEmpty()) {
throw new IOException("The snapshot name is null or empty.");
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrDeleteSnapshotOps();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.deleteSnapshot(snapshotRoot, snapshotName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
// Client Protocol
public void allowSnapshot(String snapshotRoot) throws IOException {
checkNNStartup();
metrics.incrAllowSnapshotOps();
namesystem.allowSnapshot(snapshotRoot);
}
@Override
// Client Protocol
public void disallowSnapshot(String snapshot) throws IOException {
checkNNStartup();
metrics.incrDisAllowSnapshotOps();
namesystem.disallowSnapshot(snapshot);
}
@Override
// ClientProtocol
public void renameSnapshot(String snapshotRoot, String snapshotOldName,
String snapshotNewName) throws IOException {
checkNNStartup();
if (snapshotNewName == null || snapshotNewName.isEmpty()) {
throw new IOException("The new snapshot name is null or empty.");
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrRenameSnapshotOps();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.renameSnapshot(snapshotRoot, snapshotOldName,
snapshotNewName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // Client Protocol
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
checkNNStartup();
SnapshottableDirectoryStatus[] status = namesystem
.getSnapshottableDirListing();
metrics.incrListSnapshottableDirOps();
return status;
}
@Override // ClientProtocol
public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
String earlierSnapshotName, String laterSnapshotName) throws IOException {
checkNNStartup();
SnapshotDiffReport report = namesystem.getSnapshotDiffReport(snapshotRoot,
earlierSnapshotName, laterSnapshotName);
metrics.incrSnapshotDiffReportOps();
return report;
}
@Override // ClientProtocol
public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (Long) cacheEntry.getPayload();
}
boolean success = false;
long ret = 0;
try {
ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success, ret);
}
return ret;
}
@Override // ClientProtocol
public void modifyCacheDirective(
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.modifyCacheDirective(directive, flags, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public void removeCacheDirective(long id) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.removeCacheDirective(id, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException {
checkNNStartup();
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
return namesystem.listCacheDirectives(prevId, filter);
}
@Override //ClientProtocol
public void addCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.addCachePool(info, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public void modifyCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.modifyCachePool(info, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public void removeCachePool(String cachePoolName) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.removeCachePool(cachePoolName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
checkNNStartup();
return namesystem.listCachePools(prevKey != null ? prevKey : "");
}
@Override // ClientProtocol
public void modifyAclEntries(String src, List<AclEntry> aclSpec)
throws IOException {
checkNNStartup();
namesystem.modifyAclEntries(src, aclSpec);
}
@Override // ClienProtocol
public void removeAclEntries(String src, List<AclEntry> aclSpec)
throws IOException {
checkNNStartup();
namesystem.removeAclEntries(src, aclSpec);
}
@Override // ClientProtocol
public void removeDefaultAcl(String src) throws IOException {
checkNNStartup();
namesystem.removeDefaultAcl(src);
}
@Override // ClientProtocol
public void removeAcl(String src) throws IOException {
checkNNStartup();
namesystem.removeAcl(src);
}
@Override // ClientProtocol
public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
checkNNStartup();
namesystem.setAcl(src, aclSpec);
}
@Override // ClientProtocol
public AclStatus getAclStatus(String src) throws IOException {
checkNNStartup();
return namesystem.getAclStatus(src);
}
@Override // ClientProtocol
public void createEncryptionZone(String src, String keyName)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.createEncryptionZone(src, keyName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public EncryptionZone getEZForPath(String src)
throws IOException {
checkNNStartup();
return namesystem.getEZForPath(src);
}
@Override // ClientProtocol
public BatchedEntries<EncryptionZone> listEncryptionZones(
long prevId) throws IOException {
checkNNStartup();
return namesystem.listEncryptionZones(prevId);
}
@Override // ClientProtocol
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.setXAttr(src, xAttr, flag, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
throws IOException {
checkNNStartup();
return namesystem.getXAttrs(src, xAttrs);
}
@Override // ClientProtocol
public List<XAttr> listXAttrs(String src) throws IOException {
checkNNStartup();
return namesystem.listXAttrs(src);
}
@Override // ClientProtocol
public void removeXAttr(String src, XAttr xAttr) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.removeXAttr(src, xAttr, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
private void checkNNStartup() throws IOException {
if (!this.nn.isStarted()) {
String message = NameNode.composeNotStartedMessage(this.nn.getRole());
throw new RetriableException(message);
}
}
@Override // ClientProtocol
public void checkAccess(String path, FsAction mode) throws IOException {
checkNNStartup();
namesystem.checkAccess(path, mode);
}
@Override // ClientProtocol
public long getCurrentEditLogTxid() throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
// if it's not yet open for write, we may be in the process of transitioning
// from standby to active and may not yet know what the latest committed
// txid is
return namesystem.getEditLog().isOpenForWrite() ?
namesystem.getEditLog().getLastWrittenTxId() : -1;
}
private static FSEditLogOp readOp(EditLogInputStream elis)
throws IOException {
try {
return elis.readOp();
// we can get the below two exceptions if a segment is deleted
// (because we have accumulated too many edits) or (for the local journal/
// no-QJM case only) if a in-progress segment is finalized under us ...
// no need to throw an exception back to the client in this case
} catch (FileNotFoundException e) {
LOG.debug("Tried to read from deleted or moved edit log segment", e);
return null;
} catch (HttpGetFailedException e) {
LOG.debug("Tried to read from deleted edit log segment", e);
return null;
}
}
@Override // ClientProtocol
public EventBatchList getEditsFromTxid(final long txid) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
final int maxEventsPerRPC = nn.getConf().getInt(
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
final FSEditLog log = namesystem.getFSImage().getEditLog();
final long syncTxid = log.getSyncTxId();
// If we haven't synced anything yet, we can only read finalized
// segments since we can't reliably determine which txns in in-progress
// segments have actually been committed (e.g. written to a quorum of JNs).
// If we have synced txns, we can definitely read up to syncTxid since
// syncTxid is only updated after a transaction is committed to all
// journals. (In-progress segments written by old writers are already
// discarded for us, so if we read any in-progress segments they are
// guaranteed to have been written by this NameNode.)
final boolean readInProgress = syncTxid > 0;
// doas the NN login user for the actual operations to get edits.
// Notably this is necessary when polling from the remote edits via https.
// We have validated the client is a superuser from the NN RPC, so this
// running as the login user here is safe.
EventBatchList ret = SecurityUtil.doAsLoginUser(
new PrivilegedExceptionAction<EventBatchList>() {
@Override
public EventBatchList run() throws IOException {
return getEventBatchList(syncTxid, txid, log, readInProgress,
maxEventsPerRPC);
}
});
return ret;
}
private EventBatchList getEventBatchList(long syncTxid, long txid,
FSEditLog log, boolean readInProgress, int maxEventsPerRPC)
throws IOException {
List<EventBatch> batches = Lists.newArrayList();
int totalEvents = 0;
long maxSeenTxid = -1;
long firstSeenTxid = -1;
if (syncTxid > 0 && txid > syncTxid) {
// we can't read past syncTxid, so there's no point in going any further
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
Collection<EditLogInputStream> streams = null;
try {
streams = log.selectInputStreams(txid, 0, null, readInProgress);
} catch (IllegalStateException e) { // can happen if we have
// transitioned out of active and haven't yet transitioned to standby
// and are using QJM -- the edit log will be closed and this exception
// will result
LOG.info("NN is transitioning from active to standby and FSEditLog " +
"is closed -- could not read edits");
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
boolean breakOuter = false;
for (EditLogInputStream elis : streams) {
// our assumption in this code is the EditLogInputStreams are ordered by
// starting txid
try {
FSEditLogOp op = null;
while ((op = readOp(elis)) != null) {
// break out of here in the unlikely event that syncTxid is so
// out of date that its segment has already been deleted, so the first
// txid we get is greater than syncTxid
if (syncTxid > 0 && op.getTransactionId() > syncTxid) {
breakOuter = true;
break;
}
EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
if (eventBatch != null) {
batches.add(eventBatch);
totalEvents += eventBatch.getEvents().length;
}
if (op.getTransactionId() > maxSeenTxid) {
maxSeenTxid = op.getTransactionId();
}
if (firstSeenTxid == -1) {
firstSeenTxid = op.getTransactionId();
}
if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
op.getTransactionId() == syncTxid)) {
// we're done
breakOuter = true;
break;
}
}
} finally {
elis.close();
}
if (breakOuter) {
break;
}
}
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
@Override // TraceAdminProtocol
public SpanReceiverInfo[] listSpanReceivers() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return nn.tracerConfigurationManager.listSpanReceivers();
}
@Override // TraceAdminProtocol
public long addSpanReceiver(SpanReceiverInfo info) throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return nn.tracerConfigurationManager.addSpanReceiver(info);
}
@Override // TraceAdminProtocol
public void removeSpanReceiver(long id) throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
nn.tracerConfigurationManager.removeSpanReceiver(id);
}
@Override // ReconfigurationProtocol
public void startReconfiguration() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
nn.startReconfigurationTask();
}
@Override // ReconfigurationProtocol
public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return nn.getReconfigurationTaskStatus();
}
@Override // ReconfigurationProtocol
public List<String> listReconfigurableProperties() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return Lists.newArrayList(nn.getReconfigurableProperties());
}
}