| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license |
| * agreements. See the NOTICE file distributed with this work for additional |
| * information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache |
| * License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the |
| * License. You may obtain a |
| * copy of the License at |
| * |
| * <p>http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * <p>Unless required by applicable law or agreed to in writing, software |
| * distributed under the |
| * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
| * CONDITIONS OF ANY KIND, either |
| * express or implied. See the License for the specific language governing |
| * permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdds.scm.server; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.RemovalListener; |
| import com.google.common.cache.RemovalNotification; |
| import com.google.protobuf.BlockingService; |
| import java.util.Objects; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdds.HddsUtils; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; |
| import org.apache.hadoop.hdds.ratis.RatisHelper; |
| import org.apache.hadoop.hdds.scm.HddsServerUtil; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.hdds.scm.block.BlockManager; |
| import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; |
| import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; |
| import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; |
| import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; |
| import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; |
| import org.apache.hadoop.hdds.scm.container.placement.algorithms |
| .SCMContainerPlacementMetrics; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopology; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; |
| import org.apache.hadoop.hdds.scm.safemode.SafeModeHandler; |
| import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; |
| import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; |
| import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; |
| import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; |
| import org.apache.hadoop.hdds.scm.container.ContainerID; |
| import org.apache.hadoop.hdds.scm.container.ContainerInfo; |
| import org.apache.hadoop.hdds.scm.container.ContainerManager; |
| import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; |
| import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; |
| import org.apache.hadoop.hdds.scm.container.SCMContainerManager; |
| import org.apache.hadoop.hdds.scm.PlacementPolicy; |
| import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; |
| import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; |
| import org.apache.hadoop.hdds.scm.container.ReplicationManager; |
| import org.apache.hadoop.hdds.scm.events.SCMEvents; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; |
| import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; |
| import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl; |
| import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.NewNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.NodeManager; |
| import org.apache.hadoop.hdds.scm.node.NodeReportHandler; |
| import org.apache.hadoop.hdds.scm.node.SCMNodeManager; |
| import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler; |
| import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; |
| import org.apache.hadoop.hdds.security.x509.SecurityConfig; |
| import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer; |
| import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer; |
| import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; |
| import org.apache.hadoop.hdds.server.events.EventPublisher; |
| import org.apache.hadoop.hdds.server.events.EventQueue; |
| import org.apache.hadoop.hdfs.DFSUtil; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.util.MBeans; |
| import org.apache.hadoop.ozone.OzoneConfigKeys; |
| import org.apache.hadoop.ozone.OzoneSecurityUtil; |
| import org.apache.hadoop.ozone.common.Storage.StorageState; |
| import org.apache.hadoop.ozone.lease.LeaseManager; |
| import org.apache.hadoop.ozone.lock.LockManager; |
| import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; |
| import org.apache.hadoop.security.authentication.client.AuthenticationException; |
| import org.apache.hadoop.util.JvmPauseMonitor; |
| import org.apache.hadoop.hdds.utils.HddsVersionInfo; |
| import org.apache.ratis.grpc.GrpcTlsConfig; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.management.ObjectName; |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; |
| |
| /** |
| * StorageContainerManager is the main entry point for the service that |
| * provides information about |
| * which SCM nodes host containers. |
| * |
| * <p>DataNodes report to StorageContainerManager using heartbeat messages. |
| * SCM allocates containers |
| * and returns a pipeline. |
| * |
| * <p>A client once it gets a pipeline (a list of datanodes) will connect to |
| * the datanodes and create a container, which then can be used to store data. |
| */ |
| @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) |
| public final class StorageContainerManager extends ServiceRuntimeInfoImpl |
| implements SCMMXBean { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(StorageContainerManager.class); |
| |
| /** |
| * SCM metrics. |
| */ |
| private static SCMMetrics metrics; |
| |
| /* |
| * RPC Endpoints exposed by SCM. |
| */ |
| private final SCMDatanodeProtocolServer datanodeProtocolServer; |
| private final SCMBlockProtocolServer blockProtocolServer; |
| private final SCMClientProtocolServer clientProtocolServer; |
| private SCMSecurityProtocolServer securityProtocolServer; |
| |
| /* |
| * State Managers of SCM. |
| */ |
| private NodeManager scmNodeManager; |
| private PipelineManager pipelineManager; |
| private ContainerManager containerManager; |
| private BlockManager scmBlockManager; |
| private final SCMStorageConfig scmStorageConfig; |
| |
| private SCMMetadataStore scmMetadataStore; |
| |
| private final EventQueue eventQueue; |
| /* |
| * HTTP endpoint for JMX access. |
| */ |
| private final StorageContainerManagerHttpServer httpServer; |
| /** |
| * SCM super user. |
| */ |
| private final String scmUsername; |
| private final Collection<String> scmAdminUsernames; |
| /** |
| * SCM mxbean. |
| */ |
| private ObjectName scmInfoBeanName; |
| /** |
| * Key = DatanodeUuid, value = ContainerStat. |
| */ |
| private Cache<String, ContainerStat> containerReportCache; |
| |
| private ReplicationManager replicationManager; |
| |
| private final LeaseManager<Long> commandWatcherLeaseManager; |
| |
| private SCMSafeModeManager scmSafeModeManager; |
| private CertificateServer certificateServer; |
| private GrpcTlsConfig grpcTlsConfig; |
| |
| private JvmPauseMonitor jvmPauseMonitor; |
| private final OzoneConfiguration configuration; |
| private final SafeModeHandler safeModeHandler; |
| private SCMContainerMetrics scmContainerMetrics; |
| private MetricsSystem ms; |
| |
| /** |
| * Network topology Map. |
| */ |
| private NetworkTopology clusterMap; |
| |
| /** |
| * Creates a new StorageContainerManager. Configuration will be |
| * updated with information on the actual listening addresses used |
| * for RPC servers. |
| * |
| * @param conf configuration |
| */ |
| public StorageContainerManager(OzoneConfiguration conf) |
| throws IOException, AuthenticationException { |
| // default empty configurator means default managers will be used. |
| this(conf, new SCMConfigurator()); |
| } |
| |
| |
| /** |
| * This constructor offers finer control over how SCM comes up. |
| * To use this, user needs to create a SCMConfigurator and set various |
| * managers that user wants SCM to use, if a value is missing then SCM will |
| * use the default value for that manager. |
| * |
| * @param conf - Configuration |
| * @param configurator - configurator |
| */ |
| public StorageContainerManager(OzoneConfiguration conf, |
| SCMConfigurator configurator) |
| throws IOException, AuthenticationException { |
| super(HddsVersionInfo.HDDS_VERSION_INFO); |
| |
| Objects.requireNonNull(configurator, "configurator cannot not be null"); |
| Objects.requireNonNull(conf, "configuration cannot not be null"); |
| |
| configuration = conf; |
| initMetrics(); |
| initContainerReportCache(conf); |
| /** |
| * It is assumed the scm --init command creates the SCM Storage Config. |
| */ |
| scmStorageConfig = new SCMStorageConfig(conf); |
| if (scmStorageConfig.getState() != StorageState.INITIALIZED) { |
| LOG.error("Please make sure you have run \'ozone scm --init\' " + |
| "command to generate all the required metadata."); |
| throw new SCMException("SCM not initialized due to storage config " + |
| "failure.", ResultCodes.SCM_NOT_INITIALIZED); |
| } |
| |
| /** |
| * Important : This initialization sequence is assumed by some of our tests. |
| * The testSecureOzoneCluster assumes that security checks have to be |
| * passed before any artifacts like SCM DB is created. So please don't |
| * add any other initialization above the Security checks please. |
| */ |
| if (OzoneSecurityUtil.isSecurityEnabled(conf)) { |
| loginAsSCMUser(conf); |
| } |
| |
| // Creates the SCM DBs or opens them if it exists. |
| // A valid pointer to the store is required by all the other services below. |
| initalizeMetadataStore(conf, configurator); |
| |
| // Authenticate SCM if security is enabled, this initialization can only |
| // be done after the metadata store is initialized. |
| if (OzoneSecurityUtil.isSecurityEnabled(conf)) { |
| initializeCAnSecurityProtocol(conf, configurator); |
| } else { |
| // if no Security, we do not create a Certificate Server at all. |
| // This allows user to boot SCM without security temporarily |
| // and then come back and enable it without any impact. |
| certificateServer = null; |
| securityProtocolServer = null; |
| } |
| |
| eventQueue = new EventQueue(); |
| long watcherTimeout = |
| conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, |
| HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); |
| commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher", |
| watcherTimeout); |
| initializeSystemManagers(conf, configurator); |
| |
| CloseContainerEventHandler closeContainerHandler = |
| new CloseContainerEventHandler(pipelineManager, containerManager); |
| NodeReportHandler nodeReportHandler = |
| new NodeReportHandler(scmNodeManager); |
| PipelineReportHandler pipelineReportHandler = |
| new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); |
| CommandStatusReportHandler cmdStatusReportHandler = |
| new CommandStatusReportHandler(); |
| |
| NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager, conf); |
| StaleNodeHandler staleNodeHandler = |
| new StaleNodeHandler(scmNodeManager, pipelineManager, conf); |
| DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, |
| pipelineManager, containerManager); |
| NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler = |
| new NonHealthyToHealthyNodeHandler(pipelineManager, conf); |
| ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); |
| PendingDeleteHandler pendingDeleteHandler = |
| new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); |
| |
| ContainerReportHandler containerReportHandler = |
| new ContainerReportHandler(scmNodeManager, containerManager); |
| |
| IncrementalContainerReportHandler incrementalContainerReportHandler = |
| new IncrementalContainerReportHandler( |
| scmNodeManager, containerManager); |
| |
| PipelineActionHandler pipelineActionHandler = |
| new PipelineActionHandler(pipelineManager, conf); |
| |
| |
| RetriableDatanodeEventWatcher retriableDatanodeEventWatcher = |
| new RetriableDatanodeEventWatcher<>( |
| SCMEvents.RETRIABLE_DATANODE_COMMAND, |
| SCMEvents.DELETE_BLOCK_STATUS, |
| commandWatcherLeaseManager); |
| retriableDatanodeEventWatcher.start(eventQueue); |
| |
| scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys |
| .OZONE_ADMINISTRATORS); |
| scmUsername = UserGroupInformation.getCurrentUser().getUserName(); |
| if (!scmAdminUsernames.contains(scmUsername)) { |
| scmAdminUsernames.add(scmUsername); |
| } |
| |
| datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this, |
| eventQueue); |
| blockProtocolServer = new SCMBlockProtocolServer(conf, this); |
| clientProtocolServer = new SCMClientProtocolServer(conf, this); |
| httpServer = new StorageContainerManagerHttpServer(conf); |
| |
| safeModeHandler = new SafeModeHandler(configuration, |
| clientProtocolServer, scmBlockManager, replicationManager, |
| pipelineManager); |
| |
| eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); |
| eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); |
| eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); |
| eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); |
| eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT, |
| incrementalContainerReportHandler); |
| eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); |
| eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); |
| eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler); |
| eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); |
| eventQueue.addHandler(SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE, |
| nonHealthyToHealthyNodeHandler); |
| eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); |
| eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); |
| eventQueue |
| .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); |
| eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, |
| (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); |
| eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); |
| eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); |
| eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, safeModeHandler); |
| registerMXBean(); |
| registerMetricsSource(this); |
| } |
| |
| /** |
| * This function initializes the following managers. If the configurator |
| * specifies a value, we will use it, else we will use the default value. |
| * |
| * Node Manager |
| * Pipeline Manager |
| * Container Manager |
| * Block Manager |
| * Replication Manager |
| * Safe Mode Manager |
| * |
| * @param conf - Ozone Configuration. |
| * @param configurator - A customizer which allows different managers to be |
| * used if needed. |
| * @throws IOException - on Failure. |
| */ |
| private void initializeSystemManagers(OzoneConfiguration conf, |
| SCMConfigurator configurator) |
| throws IOException { |
| if (configurator.getNetworkTopology() != null) { |
| clusterMap = configurator.getNetworkTopology(); |
| } else { |
| clusterMap = new NetworkTopologyImpl(conf); |
| } |
| |
| if(configurator.getScmNodeManager() != null) { |
| scmNodeManager = configurator.getScmNodeManager(); |
| } else { |
| scmNodeManager = new SCMNodeManager( |
| conf, scmStorageConfig, eventQueue, clusterMap); |
| } |
| |
| SCMContainerPlacementMetrics placementMetrics = |
| SCMContainerPlacementMetrics.create(); |
| PlacementPolicy containerPlacementPolicy = |
| ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager, |
| clusterMap, true, placementMetrics); |
| |
| if (configurator.getPipelineManager() != null) { |
| pipelineManager = configurator.getPipelineManager(); |
| } else { |
| pipelineManager = |
| new SCMPipelineManager(conf, scmNodeManager, eventQueue, |
| grpcTlsConfig); |
| } |
| |
| if (configurator.getContainerManager() != null) { |
| containerManager = configurator.getContainerManager(); |
| } else { |
| containerManager = new SCMContainerManager( |
| conf, scmNodeManager, pipelineManager, eventQueue); |
| } |
| |
| if (configurator.getScmBlockManager() != null) { |
| scmBlockManager = configurator.getScmBlockManager(); |
| } else { |
| scmBlockManager = new BlockManagerImpl(conf, this); |
| } |
| if (configurator.getReplicationManager() != null) { |
| replicationManager = configurator.getReplicationManager(); |
| } else { |
| replicationManager = new ReplicationManager( |
| conf.getObject(ReplicationManagerConfiguration.class), |
| containerManager, |
| containerPlacementPolicy, |
| eventQueue, |
| new LockManager<>(conf)); |
| } |
| if(configurator.getScmSafeModeManager() != null) { |
| scmSafeModeManager = configurator.getScmSafeModeManager(); |
| } else { |
| scmSafeModeManager = new SCMSafeModeManager(conf, |
| containerManager.getContainers(), pipelineManager, eventQueue); |
| } |
| } |
| |
| /** |
| * If security is enabled we need to have the Security Protocol and a |
| * default CA. This function initializes those values based on the |
| * configurator. |
| * |
| * @param conf - Config |
| * @param configurator - configurator |
| * @throws IOException - on Failure |
| * @throws AuthenticationException - on Failure |
| */ |
| private void initializeCAnSecurityProtocol(OzoneConfiguration conf, |
| SCMConfigurator configurator) throws IOException { |
| if(configurator.getCertificateServer() != null) { |
| this.certificateServer = configurator.getCertificateServer(); |
| } else { |
| // This assumes that SCM init has run, and DB metadata stores are created. |
| certificateServer = initializeCertificateServer( |
| getScmStorageConfig().getClusterID(), |
| getScmStorageConfig().getScmId()); |
| } |
| // TODO: Support Intermediary CAs in future. |
| certificateServer.init(new SecurityConfig(conf), |
| CertificateServer.CAType.SELF_SIGNED_CA); |
| securityProtocolServer = new SCMSecurityProtocolServer(conf, |
| certificateServer); |
| |
| grpcTlsConfig = RatisHelper |
| .createTlsClientConfigForSCM(new SecurityConfig(conf), |
| certificateServer); |
| } |
| |
| /** |
| * Init the metadata store based on the configurator. |
| * @param conf - Config |
| * @param configurator - configurator |
| * @throws IOException - on Failure |
| */ |
| private void initalizeMetadataStore(OzoneConfiguration conf, |
| SCMConfigurator configurator) |
| throws IOException { |
| if(configurator.getMetadataStore() != null) { |
| scmMetadataStore = configurator.getMetadataStore(); |
| } else { |
| scmMetadataStore = new SCMMetadataStoreRDBImpl(conf); |
| if (scmMetadataStore == null) { |
| throw new SCMException("Unable to initialize metadata store", |
| ResultCodes.SCM_NOT_INITIALIZED); |
| } |
| } |
| } |
| |
| /** |
| * Login as the configured user for SCM. |
| * |
| * @param conf |
| */ |
| private void loginAsSCMUser(Configuration conf) |
| throws IOException, AuthenticationException { |
| LOG.debug("Ozone security is enabled. Attempting login for SCM user. " |
| + "Principal: {}, keytab: {}", |
| conf.get(HDDS_SCM_KERBEROS_PRINCIPAL_KEY), |
| conf.get(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY)); |
| |
| if (SecurityUtil.getAuthenticationMethod(conf).equals( |
| AuthenticationMethod.KERBEROS)) { |
| UserGroupInformation.setConfiguration(conf); |
| InetSocketAddress socAddr = HddsServerUtil |
| .getScmBlockClientBindAddress(conf); |
| SecurityUtil.login(conf, HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY, |
| HDDS_SCM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName()); |
| } else { |
| throw new AuthenticationException(SecurityUtil.getAuthenticationMethod( |
| conf) + " authentication method not support. " |
| + "SCM user login failed."); |
| } |
| LOG.info("SCM login successful."); |
| } |
| |
| |
| /** |
| * This function creates/initializes a certificate server as needed. |
| * This function is idempotent, so calling this again and again after the |
| * server is initialized is not a problem. |
| * |
| * @param clusterID - Cluster ID |
| * @param scmID - SCM ID |
| */ |
| private CertificateServer initializeCertificateServer(String clusterID, |
| String scmID) throws IOException { |
| // TODO: Support Certificate Server loading via Class Name loader. |
| // So it is easy to use different Certificate Servers if needed. |
| String subject = "scm@" + InetAddress.getLocalHost().getHostName(); |
| if(this.scmMetadataStore == null) { |
| LOG.error("Cannot initialize Certificate Server without a valid meta " + |
| "data layer."); |
| throw new SCMException("Cannot initialize CA without a valid metadata " + |
| "store", ResultCodes.SCM_NOT_INITIALIZED); |
| } |
| SCMCertStore certStore = new SCMCertStore(this.scmMetadataStore); |
| return new DefaultCAServer(subject, clusterID, scmID, certStore); |
| } |
| |
| /** |
| * Builds a message for logging startup information about an RPC server. |
| * |
| * @param description RPC server description |
| * @param addr RPC server listening address |
| * @return server startup message |
| */ |
| public static String buildRpcServerStartMessage(String description, |
| InetSocketAddress addr) { |
| return addr != null |
| ? String.format("%s is listening at %s", description, addr.toString()) |
| : String.format("%s not started", description); |
| } |
| |
| /** |
| * Starts an RPC server, if configured. |
| * |
| * @param conf configuration |
| * @param addr configured address of RPC server |
| * @param protocol RPC protocol provided by RPC server |
| * @param instance RPC protocol implementation instance |
| * @param handlerCount RPC server handler count |
| * @return RPC server |
| * @throws IOException if there is an I/O error while creating RPC server |
| */ |
| public static RPC.Server startRpcServer( |
| OzoneConfiguration conf, |
| InetSocketAddress addr, |
| Class<?> protocol, |
| BlockingService instance, |
| int handlerCount) |
| throws IOException { |
| RPC.Server rpcServer = |
| new RPC.Builder(conf) |
| .setProtocol(protocol) |
| .setInstance(instance) |
| .setBindAddress(addr.getHostString()) |
| .setPort(addr.getPort()) |
| .setNumHandlers(handlerCount) |
| .setVerbose(false) |
| .setSecretManager(null) |
| .build(); |
| |
| DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); |
| return rpcServer; |
| } |
| |
| /** |
| * Create an SCM instance based on the supplied configuration. |
| * |
| * @param conf HDDS configuration |
| * @return SCM instance |
| * @throws IOException, AuthenticationException |
| */ |
| public static StorageContainerManager createSCM( |
| OzoneConfiguration conf) |
| throws IOException, AuthenticationException { |
| if (!HddsUtils.isHddsEnabled(conf)) { |
| System.err.println( |
| "SCM cannot be started in secure mode or when " + OZONE_ENABLED + "" + |
| " is set to false"); |
| System.exit(1); |
| } |
| return new StorageContainerManager(conf); |
| } |
| |
| /** |
| * Routine to set up the Version info for StorageContainerManager. |
| * |
| * @param conf OzoneConfiguration |
| * @return true if SCM initialization is successful, false otherwise. |
| * @throws IOException if init fails due to I/O error |
| */ |
| public static boolean scmInit(OzoneConfiguration conf, |
| String clusterId) throws IOException { |
| SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf); |
| StorageState state = scmStorageConfig.getState(); |
| if (state != StorageState.INITIALIZED) { |
| try { |
| if (clusterId != null && !clusterId.isEmpty()) { |
| scmStorageConfig.setClusterId(clusterId); |
| } |
| scmStorageConfig.initialize(); |
| System.out.println( |
| "SCM initialization succeeded." |
| + "Current cluster id for sd=" |
| + scmStorageConfig.getStorageDir() |
| + ";cid=" |
| + scmStorageConfig.getClusterID()); |
| return true; |
| } catch (IOException ioe) { |
| LOG.error("Could not initialize SCM version file", ioe); |
| return false; |
| } |
| } else { |
| System.out.println( |
| "SCM already initialized. Reusing existing" |
| + " cluster id for sd=" |
| + scmStorageConfig.getStorageDir() |
| + ";cid=" |
| + scmStorageConfig.getClusterID()); |
| return true; |
| } |
| } |
| |
| /** |
| * Initialize SCM metrics. |
| */ |
| public static void initMetrics() { |
| metrics = SCMMetrics.create(); |
| } |
| |
| /** |
| * Return SCM metrics instance. |
| */ |
| public static SCMMetrics getMetrics() { |
| return metrics == null ? SCMMetrics.create() : metrics; |
| } |
| |
| public SCMStorageConfig getScmStorageConfig() { |
| return scmStorageConfig; |
| } |
| |
| public SCMDatanodeProtocolServer getDatanodeProtocolServer() { |
| return datanodeProtocolServer; |
| } |
| |
| public SCMBlockProtocolServer getBlockProtocolServer() { |
| return blockProtocolServer; |
| } |
| |
| public SCMClientProtocolServer getClientProtocolServer() { |
| return clientProtocolServer; |
| } |
| |
| public SCMSecurityProtocolServer getSecurityProtocolServer() { |
| return securityProtocolServer; |
| } |
| |
| /** |
| * Initialize container reports cache that sent from datanodes. |
| * |
| * @param conf |
| */ |
| private void initContainerReportCache(OzoneConfiguration conf) { |
| containerReportCache = |
| CacheBuilder.newBuilder() |
| .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS) |
| .maximumSize(Integer.MAX_VALUE) |
| .removalListener( |
| new RemovalListener<String, ContainerStat>() { |
| @Override |
| public void onRemoval( |
| RemovalNotification<String, ContainerStat> |
| removalNotification) { |
| synchronized (containerReportCache) { |
| ContainerStat stat = removalNotification.getValue(); |
| // remove invalid container report |
| metrics.decrContainerStat(stat); |
| LOG.debug( |
| "Remove expired container stat entry for datanode: " + |
| "{}.", |
| removalNotification.getKey()); |
| } |
| } |
| }) |
| .build(); |
| } |
| |
| private void registerMXBean() { |
| final Map<String, String> jmxProperties = new HashMap<>(); |
| jmxProperties.put("component", "ServerRuntime"); |
| this.scmInfoBeanName = HddsUtils.registerWithJmxProperties( |
| "StorageContainerManager", "StorageContainerManagerInfo", |
| jmxProperties, this); |
| } |
| |
| private void registerMetricsSource(SCMMXBean scmMBean) { |
| scmContainerMetrics = SCMContainerMetrics.create(scmMBean); |
| } |
| |
| private void unregisterMXBean() { |
| if (this.scmInfoBeanName != null) { |
| MBeans.unregister(this.scmInfoBeanName); |
| this.scmInfoBeanName = null; |
| } |
| } |
| |
| @VisibleForTesting |
| public ContainerInfo getContainerInfo(long containerID) throws |
| IOException { |
| return containerManager.getContainer(ContainerID.valueof(containerID)); |
| } |
| |
| /** |
| * Returns listening address of StorageLocation Protocol RPC server. |
| * |
| * @return listen address of StorageLocation RPC server |
| */ |
| @VisibleForTesting |
| public InetSocketAddress getClientRpcAddress() { |
| return getClientProtocolServer().getClientRpcAddress(); |
| } |
| |
| @Override |
| public String getClientRpcPort() { |
| InetSocketAddress addr = getClientRpcAddress(); |
| return addr == null ? "0" : Integer.toString(addr.getPort()); |
| } |
| |
| /** |
| * Returns listening address of StorageDatanode Protocol RPC server. |
| * |
| * @return Address where datanode are communicating. |
| */ |
| public InetSocketAddress getDatanodeRpcAddress() { |
| return getDatanodeProtocolServer().getDatanodeRpcAddress(); |
| } |
| |
| @Override |
| public String getDatanodeRpcPort() { |
| InetSocketAddress addr = getDatanodeRpcAddress(); |
| return addr == null ? "0" : Integer.toString(addr.getPort()); |
| } |
| |
| /** |
| * Start service. |
| */ |
| public void start() throws IOException { |
| LOG.info( |
| buildRpcServerStartMessage( |
| "StorageContainerLocationProtocol RPC server", |
| getClientRpcAddress())); |
| |
| ms = HddsUtils.initializeMetrics(configuration, "StorageContainerManager"); |
| |
| commandWatcherLeaseManager.start(); |
| getClientProtocolServer().start(); |
| |
| LOG.info(buildRpcServerStartMessage("ScmBlockLocationProtocol RPC " + |
| "server", getBlockProtocolServer().getBlockRpcAddress())); |
| getBlockProtocolServer().start(); |
| |
| LOG.info(buildRpcServerStartMessage("ScmDatanodeProtocl RPC " + |
| "server", getDatanodeProtocolServer().getDatanodeRpcAddress())); |
| getDatanodeProtocolServer().start(); |
| if (getSecurityProtocolServer() != null) { |
| getSecurityProtocolServer().start(); |
| } |
| |
| httpServer.start(); |
| scmBlockManager.start(); |
| |
| // Start jvm monitor |
| jvmPauseMonitor = new JvmPauseMonitor(); |
| jvmPauseMonitor.init(configuration); |
| jvmPauseMonitor.start(); |
| |
| setStartTime(); |
| } |
| |
| /** |
| * Stop service. |
| */ |
| public void stop() { |
| |
| try { |
| LOG.info("Stopping Replication Manager Service."); |
| replicationManager.stop(); |
| } catch (Exception ex) { |
| LOG.error("Replication manager service stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping Lease Manager of the command watchers"); |
| commandWatcherLeaseManager.shutdown(); |
| } catch (Exception ex) { |
| LOG.error("Lease Manager of the command watchers stop failed"); |
| } |
| |
| try { |
| LOG.info("Stopping datanode service RPC server"); |
| getDatanodeProtocolServer().stop(); |
| |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager datanode RPC stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping block service RPC server"); |
| getBlockProtocolServer().stop(); |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager blockRpcServer stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping the StorageContainerLocationProtocol RPC server"); |
| getClientProtocolServer().stop(); |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager clientRpcServer stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping Storage Container Manager HTTP server."); |
| httpServer.stop(); |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager HTTP server stop failed.", ex); |
| } |
| |
| if (getSecurityProtocolServer() != null) { |
| getSecurityProtocolServer().stop(); |
| } |
| |
| try { |
| LOG.info("Stopping Block Manager Service."); |
| scmBlockManager.stop(); |
| } catch (Exception ex) { |
| LOG.error("SCM block manager service stop failed.", ex); |
| } |
| |
| if (containerReportCache != null) { |
| containerReportCache.invalidateAll(); |
| containerReportCache.cleanUp(); |
| } |
| |
| if (metrics != null) { |
| metrics.unRegister(); |
| } |
| |
| unregisterMXBean(); |
| if (scmContainerMetrics != null) { |
| scmContainerMetrics.unRegister(); |
| } |
| |
| // Event queue must be stopped before the DB store is closed at the end. |
| try { |
| LOG.info("Stopping SCM Event Queue."); |
| eventQueue.close(); |
| } catch (Exception ex) { |
| LOG.error("SCM Event Queue stop failed", ex); |
| } |
| |
| if (jvmPauseMonitor != null) { |
| jvmPauseMonitor.stop(); |
| } |
| IOUtils.cleanupWithLogger(LOG, containerManager); |
| IOUtils.cleanupWithLogger(LOG, pipelineManager); |
| |
| try { |
| scmMetadataStore.stop(); |
| } catch (Exception ex) { |
| LOG.error("SCM Metadata store stop failed", ex); |
| } |
| |
| if (ms != null) { |
| ms.stop(); |
| } |
| |
| scmSafeModeManager.stop(); |
| } |
| |
| /** |
| * Wait until service has completed shutdown. |
| */ |
| public void join() { |
| try { |
| getBlockProtocolServer().join(); |
| getClientProtocolServer().join(); |
| getDatanodeProtocolServer().join(); |
| if (getSecurityProtocolServer() != null) { |
| getSecurityProtocolServer().join(); |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| LOG.info("Interrupted during StorageContainerManager join."); |
| } |
| } |
| |
| /** |
| * Returns the Number of Datanodes that are communicating with SCM. |
| * |
| * @param nodestate Healthy, Dead etc. |
| * @return int -- count |
| */ |
| public int getNodeCount(NodeState nodestate) { |
| return scmNodeManager.getNodeCount(nodestate); |
| } |
| |
| /** |
| * Returns SCM container manager. |
| */ |
| @VisibleForTesting |
| public ContainerManager getContainerManager() { |
| return containerManager; |
| } |
| |
| /** |
| * Returns node manager. |
| * |
| * @return - Node Manager |
| */ |
| @VisibleForTesting |
| public NodeManager getScmNodeManager() { |
| return scmNodeManager; |
| } |
| |
| /** |
| * Returns pipeline manager. |
| * |
| * @return - Pipeline Manager |
| */ |
| @VisibleForTesting |
| public PipelineManager getPipelineManager() { |
| return pipelineManager; |
| } |
| |
| @VisibleForTesting |
| public BlockManager getScmBlockManager() { |
| return scmBlockManager; |
| } |
| |
| @VisibleForTesting |
| public SafeModeHandler getSafeModeHandler() { |
| return safeModeHandler; |
| } |
| |
| @VisibleForTesting |
| public SCMSafeModeManager getScmSafeModeManager() { |
| return scmSafeModeManager; |
| } |
| |
| @VisibleForTesting |
| public ReplicationManager getReplicationManager() { |
| return replicationManager; |
| } |
| |
| public void checkAdminAccess(String remoteUser) throws IOException { |
| if (remoteUser != null) { |
| if (!scmAdminUsernames.contains(remoteUser)) { |
| throw new IOException( |
| "Access denied for user " + remoteUser + ". Superuser privilege " + |
| "is required."); |
| } |
| } |
| } |
| |
| /** |
| * Invalidate container stat entry for given datanode. |
| * |
| * @param datanodeUuid |
| */ |
| public void removeContainerReport(String datanodeUuid) { |
| synchronized (containerReportCache) { |
| containerReportCache.invalidate(datanodeUuid); |
| } |
| } |
| |
| /** |
| * Get container stat of specified datanode. |
| * |
| * @param datanodeUuid |
| * @return |
| */ |
| public ContainerStat getContainerReport(String datanodeUuid) { |
| ContainerStat stat = null; |
| synchronized (containerReportCache) { |
| stat = containerReportCache.getIfPresent(datanodeUuid); |
| } |
| |
| return stat; |
| } |
| |
| /** |
| * Returns a view of the container stat entries. Modifications made to the |
| * map will directly |
| * affect the cache. |
| * |
| * @return |
| */ |
| public ConcurrentMap<String, ContainerStat> getContainerReportCache() { |
| return containerReportCache.asMap(); |
| } |
| |
| @Override |
| public Map<String, String> getContainerReport() { |
| Map<String, String> id2StatMap = new HashMap<>(); |
| synchronized (containerReportCache) { |
| ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap(); |
| for (Map.Entry<String, ContainerStat> entry : map.entrySet()) { |
| id2StatMap.put(entry.getKey(), entry.getValue().toJsonString()); |
| } |
| } |
| |
| return id2StatMap; |
| } |
| |
| /** |
| * Returns live safe mode container threshold. |
| * |
| * @return String |
| */ |
| @Override |
| public double getSafeModeCurrentContainerThreshold() { |
| return getCurrentContainerThreshold(); |
| } |
| |
| /** |
| * Returns safe mode status. |
| * @return boolean |
| */ |
| @Override |
| public boolean isInSafeMode() { |
| return scmSafeModeManager.getInSafeMode(); |
| } |
| |
| /** |
| * Returns EventPublisher. |
| */ |
| public EventPublisher getEventQueue() { |
| return eventQueue; |
| } |
| |
| /** |
| * Force SCM out of safe mode. |
| */ |
| public boolean exitSafeMode() { |
| scmSafeModeManager.exitSafeMode(eventQueue); |
| return true; |
| } |
| |
| @VisibleForTesting |
| public double getCurrentContainerThreshold() { |
| return scmSafeModeManager.getCurrentContainerThreshold(); |
| } |
| |
| @Override |
| public Map<String, Integer> getContainerStateCount() { |
| Map<String, Integer> nodeStateCount = new HashMap<>(); |
| for (HddsProtos.LifeCycleState state : HddsProtos.LifeCycleState.values()) { |
| nodeStateCount.put(state.toString(), |
| containerManager.getContainerCountByState(state)); |
| } |
| return nodeStateCount; |
| } |
| |
| /** |
| * Returns the SCM metadata Store. |
| * @return SCMMetadataStore |
| */ |
| public SCMMetadataStore getScmMetadataStore() { |
| return scmMetadataStore; |
| } |
| |
| /** |
| * Returns the SCM network topology cluster. |
| * @return NetworkTopology |
| */ |
| public NetworkTopology getClusterMap() { |
| return this.clusterMap; |
| } |
| } |