blob: efd5fc5be8e61aca0b6572017cc711d17790fc77 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding
* copyright ownership. The ASF licenses this file to you under the Apache
* License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
import org.apache.hadoop.hdds.scm.node.NewNodeHandler;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler;
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.common.StorageInfo;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.util.ExitUtil.terminate;
/**
* StorageContainerManager is the main entry point for the service that
* provides information about
* which SCM nodes host containers.
*
* <p>DataNodes report to StorageContainerManager using heartbeat messages.
* SCM allocates containers
* and returns a pipeline.
*
* <p>A client once it gets a pipeline (a list of datanodes) will connect to
* the datanodes and
* create a container, which then can be used to store data.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public final class StorageContainerManager extends ServiceRuntimeInfoImpl
implements SCMMXBean {
private static final Logger LOG = LoggerFactory
.getLogger(StorageContainerManager.class);
private static final String USAGE =
"Usage: \n ozone scm [genericOptions] "
+ "[ "
+ StartupOption.INIT.getName()
+ " [ "
+ StartupOption.CLUSTERID.getName()
+ " <cid> ] ]\n "
+ "ozone scm [genericOptions] [ "
+ StartupOption.GENCLUSTERID.getName()
+ " ]\n "
+ "ozone scm [ "
+ StartupOption.HELP.getName()
+ " ]\n";
/**
* SCM metrics.
*/
private static SCMMetrics metrics;
/*
* RPC Endpoints exposed by SCM.
*/
private final SCMDatanodeProtocolServer datanodeProtocolServer;
private final SCMBlockProtocolServer blockProtocolServer;
private final SCMClientProtocolServer clientProtocolServer;
/*
* State Managers of SCM.
*/
private final NodeManager scmNodeManager;
private final ContainerManager containerManager;
private final BlockManager scmBlockManager;
private final SCMStorage scmStorage;
private final EventQueue eventQueue;
/*
* HTTP endpoint for JMX access.
*/
private final StorageContainerManagerHttpServer httpServer;
/**
* SCM super user.
*/
private final String scmUsername;
private final Collection<String> scmAdminUsernames;
/**
* SCM mxbean.
*/
private ObjectName scmInfoBeanName;
/**
* Key = DatanodeUuid, value = ContainerStat.
*/
private Cache<String, ContainerStat> containerReportCache;
private final ReplicationManager replicationManager;
private final LeaseManager<Long> commandWatcherLeaseManager;
private final ReplicationActivityStatus replicationStatus;
private final SCMChillModeManager scmChillModeManager;
/**
* Creates a new StorageContainerManager. Configuration will be updated
* with information on the
* actual listening addresses used for RPC servers.
*
* @param conf configuration
*/
private StorageContainerManager(OzoneConfiguration conf) throws IOException {
final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
StorageContainerManager.initMetrics();
initContainerReportCache(conf);
scmStorage = new SCMStorage(conf);
if (scmStorage.getState() != StorageState.INITIALIZED) {
throw new SCMException("SCM not initialized.", ResultCodes
.SCM_NOT_INITIALIZED);
}
eventQueue = new EventQueue();
scmNodeManager = new SCMNodeManager(
conf, scmStorage.getClusterID(), this, eventQueue);
containerManager = new SCMContainerManager(
conf, getScmNodeManager(), cacheSize, eventQueue);
scmBlockManager = new BlockManagerImpl(
conf, getScmNodeManager(), containerManager, eventQueue);
replicationStatus = new ReplicationActivityStatus();
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(containerManager);
NodeReportHandler nodeReportHandler =
new NodeReportHandler(scmNodeManager);
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(
containerManager.getPipelineSelector());
CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler();
NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager);
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(containerManager.getPipelineSelector());
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
getContainerManager().getStateManager());
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
PendingDeleteHandler pendingDeleteHandler =
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(containerManager, scmNodeManager,
replicationStatus);
scmChillModeManager = new SCMChillModeManager(conf,
getContainerManager().getStateManager().getAllContainers(),
eventQueue);
PipelineActionEventHandler pipelineActionEventHandler =
new PipelineActionEventHandler();
PipelineCloseHandler pipelineCloseHandler =
new PipelineCloseHandler(containerManager.getPipelineSelector());
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
watcherTimeout);
RetriableDatanodeEventWatcher retriableDatanodeEventWatcher =
new RetriableDatanodeEventWatcher<>(
SCMEvents.RETRIABLE_DATANODE_COMMAND,
SCMEvents.DELETE_BLOCK_STATUS,
commandWatcherLeaseManager);
retriableDatanodeEventWatcher.start(eventQueue);
//TODO: support configurable containerPlacement policy
ContainerPlacementPolicy containerPlacementPolicy =
new SCMContainerPlacementCapacity(scmNodeManager, conf);
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerManager.getStateManager(), eventQueue,
commandWatcherLeaseManager);
// setup CloseContainer watcher
CloseContainerWatcher closeContainerWatcher =
new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
SCMEvents.CLOSE_CONTAINER_STATUS, commandWatcherLeaseManager,
containerManager);
closeContainerWatcher.start(eventQueue);
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
.OZONE_ADMINISTRATORS);
scmUsername = UserGroupInformation.getCurrentUser().getUserName();
if (!scmAdminUsernames.contains(scmUsername)) {
scmAdminUsernames.add(scmUsername);
}
datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this,
eventQueue);
blockProtocolServer = new SCMBlockProtocolServer(conf, this);
clientProtocolServer = new SCMClientProtocolServer(conf, this);
httpServer = new StorageContainerManagerHttpServer(conf);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
eventQueue.addHandler(SCMEvents.START_REPLICATION,
replicationStatus.getReplicationStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
replicationStatus.getChillModeStatusListener());
eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS,
(DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
pipelineActionEventHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scmBlockManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
registerMXBean();
}
/**
* Builds a message for logging startup information about an RPC server.
*
* @param description RPC server description
* @param addr RPC server listening address
* @return server startup message
*/
public static String buildRpcServerStartMessage(String description,
InetSocketAddress addr) {
return addr != null
? String.format("%s is listening at %s", description, addr.toString())
: String.format("%s not started", description);
}
/**
* Starts an RPC server, if configured.
*
* @param conf configuration
* @param addr configured address of RPC server
* @param protocol RPC protocol provided by RPC server
* @param instance RPC protocol implementation instance
* @param handlerCount RPC server handler count
* @return RPC server
* @throws IOException if there is an I/O error while creating RPC server
*/
public static RPC.Server startRpcServer(
OzoneConfiguration conf,
InetSocketAddress addr,
Class<?> protocol,
BlockingService instance,
int handlerCount)
throws IOException {
RPC.Server rpcServer =
new RPC.Builder(conf)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(addr.getHostString())
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(null)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
return rpcServer;
}
/**
* Main entry point for starting StorageContainerManager.
*
* @param argv arguments
* @throws IOException if startup fails due to I/O error
*/
public static void main(String[] argv) throws IOException {
if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
System.exit(0);
}
try {
OzoneConfiguration conf = new OzoneConfiguration();
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
if (!hParser.isParseSuccessful()) {
System.err.println("USAGE: " + USAGE + "\n");
hParser.printGenericCommandUsage(System.err);
System.exit(1);
}
StorageContainerManager scm = createSCM(
hParser.getRemainingArgs(), conf, true);
if (scm != null) {
scm.start();
scm.join();
}
} catch (Throwable t) {
LOG.error("Failed to start the StorageContainerManager.", t);
terminate(1, t);
}
}
private static void printUsage(PrintStream out) {
out.println(USAGE + "\n");
}
/**
* Create an SCM instance based on the supplied command-line arguments.
*
* This method is intended for unit tests only. It suppresses the
* startup/shutdown message and skips registering Unix signal
* handlers.
*
* @param args command line arguments.
* @param conf HDDS configuration
* @return SCM instance
* @throws IOException
*/
@VisibleForTesting
public static StorageContainerManager createSCM(
String[] args, OzoneConfiguration conf) throws IOException {
return createSCM(args, conf, false);
}
/**
* Create an SCM instance based on the supplied command-line arguments.
*
* @param args command-line arguments.
* @param conf HDDS configuration
* @param printBanner if true, then log a verbose startup message.
* @return SCM instance
* @throws IOException
*/
private static StorageContainerManager createSCM(
String[] args,
OzoneConfiguration conf,
boolean printBanner) throws IOException {
String[] argv = (args == null) ? new String[0] : args;
if (!HddsUtils.isHddsEnabled(conf)) {
System.err.println(
"SCM cannot be started in secure mode or when " + OZONE_ENABLED + "" +
" is set to false");
System.exit(1);
}
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
terminate(1);
return null;
}
switch (startOpt) {
case INIT:
if (printBanner) {
StringUtils.startupShutdownMessage(StorageContainerManager.class, argv,
LOG);
}
terminate(scmInit(conf) ? 0 : 1);
return null;
case GENCLUSTERID:
if (printBanner) {
StringUtils.startupShutdownMessage(StorageContainerManager.class, argv,
LOG);
}
System.out.println("Generating new cluster id:");
System.out.println(StorageInfo.newClusterID());
terminate(0);
return null;
case HELP:
printUsage(System.err);
terminate(0);
return null;
default:
if (printBanner) {
StringUtils.startupShutdownMessage(StorageContainerManager.class, argv,
LOG);
}
return new StorageContainerManager(conf);
}
}
/**
* Routine to set up the Version info for StorageContainerManager.
*
* @param conf OzoneConfiguration
* @return true if SCM initialization is successful, false otherwise.
* @throws IOException if init fails due to I/O error
*/
public static boolean scmInit(OzoneConfiguration conf) throws IOException {
SCMStorage scmStorage = new SCMStorage(conf);
StorageState state = scmStorage.getState();
if (state != StorageState.INITIALIZED) {
try {
String clusterId = StartupOption.INIT.getClusterId();
if (clusterId != null && !clusterId.isEmpty()) {
scmStorage.setClusterId(clusterId);
}
scmStorage.initialize();
System.out.println(
"SCM initialization succeeded."
+ "Current cluster id for sd="
+ scmStorage.getStorageDir()
+ ";cid="
+ scmStorage.getClusterID());
return true;
} catch (IOException ioe) {
LOG.error("Could not initialize SCM version file", ioe);
return false;
}
} else {
System.out.println(
"SCM already initialized. Reusing existing"
+ " cluster id for sd="
+ scmStorage.getStorageDir()
+ ";cid="
+ scmStorage.getClusterID());
return true;
}
}
private static StartupOption parseArguments(String[] args) {
int argsLen = (args == null) ? 0 : args.length;
StartupOption startOpt = StartupOption.HELP;
if (argsLen == 0) {
startOpt = StartupOption.REGULAR;
}
for (int i = 0; i < argsLen; i++) {
String cmd = args[i];
if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.INIT;
if (argsLen > 3) {
return null;
}
for (i = i + 1; i < argsLen; i++) {
if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
i++;
if (i < argsLen && !args[i].isEmpty()) {
startOpt.setClusterId(args[i]);
} else {
// if no cluster id specified or is empty string, return null
LOG.error(
"Must specify a valid cluster ID after the "
+ StartupOption.CLUSTERID.getName()
+ " flag");
return null;
}
} else {
return null;
}
}
} else {
if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
if (argsLen > 1) {
return null;
}
startOpt = StartupOption.GENCLUSTERID;
}
}
}
return startOpt;
}
/**
* Initialize SCM metrics.
*/
public static void initMetrics() {
metrics = SCMMetrics.create();
}
/**
* Return SCM metrics instance.
*/
public static SCMMetrics getMetrics() {
return metrics == null ? SCMMetrics.create() : metrics;
}
public SCMStorage getScmStorage() {
return scmStorage;
}
public SCMDatanodeProtocolServer getDatanodeProtocolServer() {
return datanodeProtocolServer;
}
public SCMBlockProtocolServer getBlockProtocolServer() {
return blockProtocolServer;
}
public SCMClientProtocolServer getClientProtocolServer() {
return clientProtocolServer;
}
/**
* Initialize container reports cache that sent from datanodes.
*
* @param conf
*/
private void initContainerReportCache(OzoneConfiguration conf) {
containerReportCache =
CacheBuilder.newBuilder()
.expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.maximumSize(Integer.MAX_VALUE)
.removalListener(
new RemovalListener<String, ContainerStat>() {
@Override
public void onRemoval(
RemovalNotification<String, ContainerStat>
removalNotification) {
synchronized (containerReportCache) {
ContainerStat stat = removalNotification.getValue();
// remove invalid container report
metrics.decrContainerStat(stat);
LOG.debug(
"Remove expired container stat entry for datanode: " +
"{}.",
removalNotification.getKey());
}
}
})
.build();
}
private void registerMXBean() {
Map<String, String> jmxProperties = new HashMap<>();
jmxProperties.put("component", "ServerRuntime");
this.scmInfoBeanName =
MBeans.register(
"StorageContainerManager", "StorageContainerManagerInfo",
jmxProperties, this);
}
private void unregisterMXBean() {
if (this.scmInfoBeanName != null) {
MBeans.unregister(this.scmInfoBeanName);
this.scmInfoBeanName = null;
}
}
@VisibleForTesting
public ContainerInfo getContainerInfo(long containerID) throws
IOException {
return containerManager.getContainer(containerID);
}
/**
* Returns listening address of StorageLocation Protocol RPC server.
*
* @return listen address of StorageLocation RPC server
*/
@VisibleForTesting
public InetSocketAddress getClientRpcAddress() {
return getClientProtocolServer().getClientRpcAddress();
}
@Override
public String getClientRpcPort() {
InetSocketAddress addr = getClientRpcAddress();
return addr == null ? "0" : Integer.toString(addr.getPort());
}
/**
* Returns listening address of StorageDatanode Protocol RPC server.
*
* @return Address where datanode are communicating.
*/
public InetSocketAddress getDatanodeRpcAddress() {
return getDatanodeProtocolServer().getDatanodeRpcAddress();
}
@Override
public String getDatanodeRpcPort() {
InetSocketAddress addr = getDatanodeRpcAddress();
return addr == null ? "0" : Integer.toString(addr.getPort());
}
/**
* Start service.
*/
public void start() throws IOException {
LOG.info(
buildRpcServerStartMessage(
"StorageContainerLocationProtocol RPC server",
getClientRpcAddress()));
DefaultMetricsSystem.initialize("StorageContainerManager");
commandWatcherLeaseManager.start();
getClientProtocolServer().start();
LOG.info(buildRpcServerStartMessage("ScmBlockLocationProtocol RPC " +
"server", getBlockProtocolServer().getBlockRpcAddress()));
getBlockProtocolServer().start();
LOG.info(buildRpcServerStartMessage("ScmDatanodeProtocl RPC " +
"server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
getDatanodeProtocolServer().start();
httpServer.start();
scmBlockManager.start();
replicationStatus.start();
replicationManager.start();
setStartTime();
}
/**
* Stop service.
*/
public void stop() {
try {
LOG.info("Stopping Replication Activity Status tracker.");
replicationStatus.close();
} catch (Exception ex) {
LOG.error("Replication Activity Status tracker stop failed.", ex);
}
try {
LOG.info("Stopping Replication Manager Service.");
replicationManager.stop();
} catch (Exception ex) {
LOG.error("Replication manager service stop failed.", ex);
}
try {
LOG.info("Stopping Lease Manager of the command watchers");
commandWatcherLeaseManager.shutdown();
} catch (Exception ex) {
LOG.error("Lease Manager of the command watchers stop failed");
}
try {
LOG.info("Stopping datanode service RPC server");
getDatanodeProtocolServer().stop();
} catch (Exception ex) {
LOG.error("Storage Container Manager datanode RPC stop failed.", ex);
}
try {
LOG.info("Stopping block service RPC server");
getBlockProtocolServer().stop();
} catch (Exception ex) {
LOG.error("Storage Container Manager blockRpcServer stop failed.", ex);
}
try {
LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
getClientProtocolServer().stop();
} catch (Exception ex) {
LOG.error("Storage Container Manager clientRpcServer stop failed.", ex);
}
try {
LOG.info("Stopping Storage Container Manager HTTP server.");
httpServer.stop();
} catch (Exception ex) {
LOG.error("Storage Container Manager HTTP server stop failed.", ex);
}
try {
LOG.info("Stopping Block Manager Service.");
scmBlockManager.stop();
} catch (Exception ex) {
LOG.error("SCM block manager service stop failed.", ex);
}
if (containerReportCache != null) {
containerReportCache.invalidateAll();
containerReportCache.cleanUp();
}
if (metrics != null) {
metrics.unRegister();
}
unregisterMXBean();
// Event queue must be stopped before the DB store is closed at the end.
try {
LOG.info("Stopping SCM Event Queue.");
eventQueue.close();
} catch (Exception ex) {
LOG.error("SCM Event Queue stop failed", ex);
}
IOUtils.cleanupWithLogger(LOG, containerManager);
}
/**
* Wait until service has completed shutdown.
*/
public void join() {
try {
getBlockProtocolServer().join();
getClientProtocolServer().join();
getDatanodeProtocolServer().join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted during StorageContainerManager join.");
}
}
/**
* Returns the Number of Datanodes that are communicating with SCM.
*
* @param nodestate Healthy, Dead etc.
* @return int -- count
*/
public int getNodeCount(NodeState nodestate) {
return scmNodeManager.getNodeCount(nodestate);
}
/**
* Returns SCM container manager.
*/
@VisibleForTesting
public ContainerManager getContainerManager() {
return containerManager;
}
/**
* Returns node manager.
*
* @return - Node Manager
*/
@VisibleForTesting
public NodeManager getScmNodeManager() {
return scmNodeManager;
}
@VisibleForTesting
public BlockManager getScmBlockManager() {
return scmBlockManager;
}
public void checkAdminAccess(String remoteUser) throws IOException {
if (remoteUser != null) {
if (!scmAdminUsernames.contains(remoteUser)) {
throw new IOException(
"Access denied for user " + remoteUser + ". Superuser privilege " +
"is required.");
}
}
}
/**
* Invalidate container stat entry for given datanode.
*
* @param datanodeUuid
*/
public void removeContainerReport(String datanodeUuid) {
synchronized (containerReportCache) {
containerReportCache.invalidate(datanodeUuid);
}
}
/**
* Get container stat of specified datanode.
*
* @param datanodeUuid
* @return
*/
public ContainerStat getContainerReport(String datanodeUuid) {
ContainerStat stat = null;
synchronized (containerReportCache) {
stat = containerReportCache.getIfPresent(datanodeUuid);
}
return stat;
}
/**
* Returns a view of the container stat entries. Modifications made to the
* map will directly
* affect the cache.
*
* @return
*/
public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
return containerReportCache.asMap();
}
@Override
public Map<String, String> getContainerReport() {
Map<String, String> id2StatMap = new HashMap<>();
synchronized (containerReportCache) {
ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
}
}
return id2StatMap;
}
/**
* Returns live chill mode container threshold.
*
* @return String
*/
@Override
public double getChillModeCurrentContainerThreshold() {
return getCurrentContainerThreshold();
}
/**
* Returns chill mode status.
* @return boolean
*/
@Override
public boolean isInChillMode() {
return scmChillModeManager.getInChillMode();
}
/**
* Returns EventPublisher.
*/
public EventPublisher getEventQueue(){
return eventQueue;
}
/**
* Force SCM out of chill mode.
*/
public boolean exitChillMode() {
scmChillModeManager.exitChillMode(eventQueue);
return true;
}
@VisibleForTesting
public double getCurrentContainerThreshold() {
return scmChillModeManager.getCurrentContainerThreshold();
}
/**
* Startup options.
*/
public enum StartupOption {
INIT("-init"),
CLUSTERID("-clusterid"),
GENCLUSTERID("-genclusterid"),
REGULAR("-regular"),
HELP("-help");
private final String name;
private String clusterId = null;
StartupOption(String arg) {
this.name = arg;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String cid) {
if (cid != null && !cid.isEmpty()) {
clusterId = cid;
}
}
public String getName() {
return name;
}
}
}