| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license |
| * agreements. See the NOTICE file distributed with this work for additional |
| * information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache |
| * License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the |
| * License. You may obtain a |
| * copy of the License at |
| * |
| * <p>http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * <p>Unless required by applicable law or agreed to in writing, software |
| * distributed under the |
| * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
| * CONDITIONS OF ANY KIND, either |
| * express or implied. See the License for the specific language governing |
| * permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdds.scm.server; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.RemovalListener; |
| import com.google.protobuf.BlockingService; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdds.HddsConfigKeys; |
| import org.apache.hadoop.hdds.HddsUtils; |
| import org.apache.hadoop.hdds.annotation.InterfaceAudience; |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; |
| import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; |
| import org.apache.hadoop.hdds.scm.PlacementPolicy; |
| import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl; |
| import org.apache.hadoop.hdds.scm.container.ContainerManager; |
| import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy; |
| import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; |
| import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager; |
| import org.apache.hadoop.hdds.scm.container.replication.OverReplicatedProcessor; |
| import org.apache.hadoop.hdds.scm.container.replication.UnderReplicatedProcessor; |
| import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler; |
| import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService; |
| import org.apache.hadoop.hdds.scm.ha.HASecurityUtils; |
| import org.apache.hadoop.hdds.scm.ha.SCMContext; |
| import org.apache.hadoop.hdds.scm.ha.SCMHAManager; |
| import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl; |
| import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails; |
| import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; |
| import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; |
| import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; |
| import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; |
| import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl; |
| import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; |
| import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; |
| import org.apache.hadoop.hdds.scm.ScmInfo; |
| import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler; |
| import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager; |
| import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManagerImpl; |
| import org.apache.hadoop.hdds.scm.node.CommandQueueReportHandler; |
| import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager; |
| import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl; |
| import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext; |
| import org.apache.hadoop.hdds.scm.server.upgrade.ScmHAUnfinalizedStateValidationAction; |
| import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory; |
| import org.apache.hadoop.hdds.security.token.ContainerTokenGenerator; |
| import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager; |
| import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore; |
| import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultCAProfile; |
| import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile; |
| import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient; |
| import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; |
| import org.apache.hadoop.hdds.server.ServerUtils; |
| import org.apache.hadoop.hdds.server.events.EventExecutor; |
| import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor; |
| import org.apache.hadoop.hdds.server.http.RatisDropwizardExports; |
| import org.apache.hadoop.hdds.utils.HAUtils; |
| import org.apache.hadoop.hdds.utils.HddsServerUtil; |
| import org.apache.hadoop.hdds.scm.ScmConfig; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.hdds.scm.block.BlockManager; |
| import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; |
| import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; |
| import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; |
| import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; |
| import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; |
| import org.apache.hadoop.hdds.scm.container.ContainerID; |
| import org.apache.hadoop.hdds.scm.container.ContainerInfo; |
| import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; |
| import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; |
| import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; |
| import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; |
| import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; |
| import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; |
| import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; |
| import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; |
| import org.apache.hadoop.hdds.scm.events.SCMEvents; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; |
| import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; |
| import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopology; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; |
| import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.NewNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.StartDatanodeAdminHandler; |
| import org.apache.hadoop.hdds.scm.node.NodeManager; |
| import org.apache.hadoop.hdds.scm.node.NodeReportHandler; |
| import org.apache.hadoop.hdds.scm.node.HealthyReadOnlyNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.ReadOnlyHealthyToHealthyNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.SCMNodeManager; |
| import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; |
| import org.apache.hadoop.hdds.scm.node.NodeDecommissionManager; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler; |
| import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolicyFactory; |
| import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; |
| import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; |
| import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; |
| import org.apache.hadoop.hdds.security.OzoneSecurityException; |
| import org.apache.hadoop.hdds.security.x509.SecurityConfig; |
| import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer; |
| import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer; |
| import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; |
| import org.apache.hadoop.hdds.server.events.EventPublisher; |
| import org.apache.hadoop.hdds.server.events.EventQueue; |
| import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager; |
| import org.apache.hadoop.hdds.utils.HddsVersionInfo; |
| import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.util.MBeans; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.ozone.OzoneConfigKeys; |
| import org.apache.hadoop.ozone.OzoneSecurityUtil; |
| import org.apache.hadoop.ozone.common.MonotonicClock; |
| import org.apache.hadoop.ozone.common.Storage.StorageState; |
| import org.apache.hadoop.ozone.lease.LeaseManager; |
| import org.apache.hadoop.ozone.upgrade.DefaultUpgradeFinalizationExecutor; |
| import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; |
| import org.apache.hadoop.security.authentication.client.AuthenticationException; |
| import org.apache.hadoop.util.JvmPauseMonitor; |
| import org.apache.ratis.util.ExitUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.management.ObjectName; |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.math.BigInteger; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.security.cert.CertificateException; |
| import java.security.cert.X509Certificate; |
| import java.time.Clock; |
| import java.time.ZoneOffset; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT; |
| import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS; |
| import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; |
| import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY; |
| import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX; |
| import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME; |
| |
| /** |
| * StorageContainerManager is the main entry point for the service that |
| * provides information about |
| * which SCM nodes host containers. |
| * |
| * <p>DataNodes report to StorageContainerManager using heartbeat messages. |
| * SCM allocates containers |
| * and returns a pipeline. |
| * |
| * <p>A client once it gets a pipeline (a list of datanodes) will connect to |
| * the datanodes and create a container, which then can be used to store data. |
| */ |
| @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) |
| public final class StorageContainerManager extends ServiceRuntimeInfoImpl |
| implements SCMMXBean, OzoneStorageContainerManager { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(StorageContainerManager.class); |
| |
| /** |
| * SCM metrics. |
| */ |
| private static SCMMetrics metrics; |
| |
| /* |
| * RPC Endpoints exposed by SCM. |
| */ |
| private final SCMDatanodeProtocolServer datanodeProtocolServer; |
| private final SCMBlockProtocolServer blockProtocolServer; |
| private final SCMClientProtocolServer clientProtocolServer; |
| private SCMSecurityProtocolServer securityProtocolServer; |
| |
| /* |
| * State Managers of SCM. |
| */ |
| private NodeManager scmNodeManager; |
| private PipelineManager pipelineManager; |
| private ContainerManager containerManager; |
| private BlockManager scmBlockManager; |
| private final SCMStorageConfig scmStorageConfig; |
| private NodeDecommissionManager scmDecommissionManager; |
| private WritableContainerFactory writableContainerFactory; |
| private FinalizationManager finalizationManager; |
| private HDDSLayoutVersionManager scmLayoutVersionManager; |
| |
| private SCMMetadataStore scmMetadataStore; |
| private CertificateStore certificateStore; |
| private SCMHAManager scmHAManager; |
| private SCMContext scmContext; |
| private SequenceIdGenerator sequenceIdGen; |
| |
| private final EventQueue eventQueue; |
| private final SCMServiceManager serviceManager; |
| |
| /* |
| * HTTP endpoint for JMX access. |
| */ |
| private StorageContainerManagerHttpServer httpServer; |
| /** |
| * SCM super user. |
| */ |
| private final Collection<String> scmAdminUsernames; |
| /** |
| * SCM mxbean. |
| */ |
| private ObjectName scmInfoBeanName; |
| /** |
| * Key = DatanodeUuid, value = ContainerStat. |
| */ |
| private final Cache<String, ContainerStat> containerReportCache; |
| |
| private ReplicationManager replicationManager; |
| |
| private final LeaseManager<Long> commandWatcherLeaseManager; |
| |
| private SCMSafeModeManager scmSafeModeManager; |
| private SCMCertificateClient scmCertificateClient; |
| private ContainerTokenSecretManager containerTokenMgr; |
| |
| private JvmPauseMonitor jvmPauseMonitor; |
| private final OzoneConfiguration configuration; |
| private SCMContainerMetrics scmContainerMetrics; |
| private SCMContainerPlacementMetrics placementMetrics; |
| private PlacementPolicy containerPlacementPolicy; |
| private PlacementPolicy ecContainerPlacementPolicy; |
| private PlacementPolicyValidateProxy placementPolicyValidateProxy; |
| private MetricsSystem ms; |
| private final Map<String, RatisDropwizardExports> ratisMetricsMap = |
| new ConcurrentHashMap<>(); |
| private String primaryScmNodeId; |
| |
| /** |
| * Network topology Map. |
| */ |
| private NetworkTopology clusterMap; |
| private PipelineChoosePolicy pipelineChoosePolicy; |
| private SecurityConfig securityConfig; |
| |
| private final SCMHANodeDetails scmHANodeDetails; |
| |
| private ContainerBalancer containerBalancer; |
| private StatefulServiceStateManager statefulServiceStateManager; |
| // Used to keep track of pending replication and pending deletes for |
| // container replicas. |
| private ContainerReplicaPendingOps containerReplicaPendingOps; |
| |
| /** |
| * Creates a new StorageContainerManager. Configuration will be |
| * updated with information on the actual listening addresses used |
| * for RPC servers. |
| * |
| * @param conf configuration |
| */ |
| @VisibleForTesting |
| public StorageContainerManager(OzoneConfiguration conf) |
| throws IOException, AuthenticationException { |
| // default empty configurator means default managers will be used. |
| this(conf, new SCMConfigurator()); |
| } |
| |
| /** |
| * This constructor offers finer control over how SCM comes up. |
| * To use this, user needs to create a SCMConfigurator and set various |
| * managers that user wants SCM to use, if a value is missing then SCM will |
| * use the default value for that manager. |
| * |
| * @param conf - Configuration |
| * @param configurator - configurator |
| */ |
| @SuppressWarnings("checkstyle:methodlength") |
| private StorageContainerManager(OzoneConfiguration conf, |
| SCMConfigurator configurator) |
| throws IOException, AuthenticationException { |
| super(HddsVersionInfo.HDDS_VERSION_INFO); |
| |
| Objects.requireNonNull(configurator, "configurator cannot not be null"); |
| Objects.requireNonNull(conf, "configuration cannot not be null"); |
| /** |
| * It is assumed the scm --init command creates the SCM Storage Config. |
| */ |
| scmStorageConfig = new SCMStorageConfig(conf); |
| |
| scmHANodeDetails = SCMHANodeDetails.loadSCMHAConfig(conf, scmStorageConfig); |
| configuration = conf; |
| initMetrics(); |
| containerReportCache = buildContainerReportCache(); |
| |
| if (scmStorageConfig.getState() != StorageState.INITIALIZED) { |
| String errMsg = "Please make sure you have run \'ozone scm --init\' " + |
| "command to generate all the required metadata to " + |
| scmStorageConfig.getStorageDir(); |
| if (SCMHAUtils.isSCMHAEnabled(conf) && !scmStorageConfig |
| .isSCMHAEnabled()) { |
| errMsg += " or make sure you have run \'ozone scm --bootstrap\' cmd to " |
| + "add the SCM to existing SCM HA group"; |
| } |
| LOG.error(errMsg + "."); |
| throw new SCMException("SCM not initialized due to storage config " + |
| "failure.", ResultCodes.SCM_NOT_INITIALIZED); |
| } |
| |
| primaryScmNodeId = scmStorageConfig.getPrimaryScmNodeId(); |
| initializeCertificateClient(); |
| |
| /** |
| * Important : This initialization sequence is assumed by some of our tests. |
| * The testSecureOzoneCluster assumes that security checks have to be |
| * passed before any artifacts like SCM DB is created. So please don't |
| * add any other initialization above the Security checks please. |
| */ |
| if (OzoneSecurityUtil.isSecurityEnabled(conf)) { |
| loginAsSCMUserIfSecurityEnabled(scmHANodeDetails, conf); |
| } |
| |
| // Creates the SCM DBs or opens them if it exists. |
| // A valid pointer to the store is required by all the other services below. |
| initalizeMetadataStore(conf, configurator); |
| |
| eventQueue = new EventQueue(); |
| serviceManager = new SCMServiceManager(); |
| |
| long watcherTimeout = |
| conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, |
| HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); |
| commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher", |
| watcherTimeout); |
| initializeSystemManagers(conf, configurator); |
| |
| // Authenticate SCM if security is enabled, this initialization can only |
| // be done after the metadata store is initialized. |
| if (OzoneSecurityUtil.isSecurityEnabled(conf)) { |
| initializeCAnSecurityProtocol(conf, configurator); |
| } else { |
| // if no Security, we do not create a Certificate Server at all. |
| // This allows user to boot SCM without security temporarily |
| // and then come back and enable it without any impact. |
| securityProtocolServer = null; |
| } |
| |
| scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys |
| .OZONE_ADMINISTRATORS); |
| String scmShortUsername = |
| UserGroupInformation.getCurrentUser().getShortUserName(); |
| |
| if (!scmAdminUsernames.contains(scmShortUsername)) { |
| scmAdminUsernames.add(scmShortUsername); |
| } |
| |
| datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this, |
| eventQueue); |
| blockProtocolServer = new SCMBlockProtocolServer(conf, this); |
| clientProtocolServer = new SCMClientProtocolServer(conf, this); |
| |
| initializeEventHandlers(); |
| |
| containerBalancer = new ContainerBalancer(this); |
| LOG.info(containerBalancer.toString()); |
| |
| // Emit initial safe mode status, as now handlers are registered. |
| scmSafeModeManager.emitSafeModeStatus(); |
| |
| registerMXBean(); |
| registerMetricsSource(this); |
| } |
| |
| |
| private void initializeEventHandlers() { |
| CloseContainerEventHandler closeContainerHandler = |
| new CloseContainerEventHandler( |
| pipelineManager, containerManager, scmContext); |
| NodeReportHandler nodeReportHandler = |
| new NodeReportHandler(scmNodeManager); |
| CommandQueueReportHandler commandQueueReportHandler = |
| new CommandQueueReportHandler(scmNodeManager); |
| PipelineReportHandler pipelineReportHandler = |
| new PipelineReportHandler( |
| scmSafeModeManager, pipelineManager, scmContext, configuration); |
| CommandStatusReportHandler cmdStatusReportHandler = |
| new CommandStatusReportHandler(); |
| |
| NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager, |
| scmDecommissionManager, configuration, serviceManager); |
| NodeAddressUpdateHandler nodeAddressUpdateHandler = |
| new NodeAddressUpdateHandler(pipelineManager, |
| scmDecommissionManager, serviceManager); |
| StaleNodeHandler staleNodeHandler = |
| new StaleNodeHandler(scmNodeManager, pipelineManager, configuration); |
| DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, |
| pipelineManager, containerManager); |
| StartDatanodeAdminHandler datanodeStartAdminHandler = |
| new StartDatanodeAdminHandler(scmNodeManager, pipelineManager); |
| ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler = |
| new ReadOnlyHealthyToHealthyNodeHandler(configuration, serviceManager); |
| HealthyReadOnlyNodeHandler |
| healthyReadOnlyNodeHandler = |
| new HealthyReadOnlyNodeHandler(scmNodeManager, |
| pipelineManager); |
| ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); |
| |
| ContainerReportHandler containerReportHandler = |
| new ContainerReportHandler(scmNodeManager, containerManager, |
| scmContext, configuration); |
| |
| IncrementalContainerReportHandler incrementalContainerReportHandler = |
| new IncrementalContainerReportHandler( |
| scmNodeManager, containerManager, scmContext); |
| PipelineActionHandler pipelineActionHandler = |
| new PipelineActionHandler(pipelineManager, scmContext, configuration); |
| CRLStatusReportHandler crlStatusReportHandler = |
| new CRLStatusReportHandler(certificateStore, configuration); |
| |
| eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); |
| eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); |
| eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); |
| eventQueue.addHandler(SCMEvents.COMMAND_QUEUE_REPORT, |
| commandQueueReportHandler); |
| |
| // Use the same executor for both ICR and FCR. |
| // The Executor maps the event to a thread for DN. |
| // Dispatcher should always dispatch FCR first followed by ICR |
| List<ThreadPoolExecutor> executors = |
| FixedThreadPoolWithAffinityExecutor.initializeExecutorPool( |
| SCMEvents.CONTAINER_REPORT.getName() |
| + "_OR_" |
| + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName()); |
| |
| EventExecutor<ContainerReportFromDatanode> |
| containerReportExecutors = |
| new FixedThreadPoolWithAffinityExecutor<>( |
| EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT, |
| containerReportHandler), |
| executors); |
| EventExecutor<IncrementalContainerReportFromDatanode> |
| incrementalReportExecutors = |
| new FixedThreadPoolWithAffinityExecutor<>( |
| EventQueue.getExecutorName( |
| SCMEvents.INCREMENTAL_CONTAINER_REPORT, |
| incrementalContainerReportHandler), |
| executors); |
| |
| eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors, |
| containerReportHandler); |
| eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT, |
| incrementalReportExecutors, |
| incrementalContainerReportHandler); |
| eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); |
| eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); |
| eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler); |
| eventQueue.addHandler(SCMEvents.NODE_ADDRESS_UPDATE, |
| nodeAddressUpdateHandler); |
| eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); |
| eventQueue.addHandler(SCMEvents.HEALTHY_READONLY_TO_HEALTHY_NODE, |
| readOnlyHealthyToHealthyNodeHandler); |
| eventQueue.addHandler(SCMEvents.HEALTHY_READONLY_NODE, |
| healthyReadOnlyNodeHandler); |
| eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); |
| eventQueue.addHandler(SCMEvents.START_ADMIN_ON_NODE, |
| datanodeStartAdminHandler); |
| eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); |
| eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, |
| (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); |
| eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); |
| eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); |
| eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler); |
| |
| } |
| |
| private void initializeCertificateClient() { |
| securityConfig = new SecurityConfig(configuration); |
| if (OzoneSecurityUtil.isSecurityEnabled(configuration) && |
| scmStorageConfig.checkPrimarySCMIdInitialized()) { |
| scmCertificateClient = new SCMCertificateClient( |
| securityConfig, scmStorageConfig.getScmCertSerialId()); |
| } |
| } |
| |
| public OzoneConfiguration getConfiguration() { |
| return configuration; |
| } |
| |
| /** |
| * Create an SCM instance based on the supplied configuration. |
| * |
| * @param conf HDDS configuration |
| * @param configurator SCM configurator |
| * @return SCM instance |
| * @throws IOException, AuthenticationException |
| */ |
| public static StorageContainerManager createSCM( |
| OzoneConfiguration conf, SCMConfigurator configurator) |
| throws IOException, AuthenticationException { |
| return new StorageContainerManager(conf, configurator); |
| } |
| |
| /** |
| * Create an SCM instance based on the supplied configuration. |
| * |
| * @param conf HDDS configuration |
| * @return SCM instance |
| * @throws IOException, AuthenticationException |
| */ |
| public static StorageContainerManager createSCM(OzoneConfiguration conf) |
| throws IOException, AuthenticationException { |
| return createSCM(conf, new SCMConfigurator()); |
| } |
| |
| /** |
| * This function initializes the following managers. If the configurator |
| * specifies a value, we will use it, else we will use the default value. |
| * |
| * Node Manager |
| * Pipeline Manager |
| * Container Manager |
| * Block Manager |
| * Replication Manager |
| * Safe Mode Manager |
| * |
| * @param conf - Ozone Configuration. |
| * @param configurator - A customizer which allows different managers to be |
| * used if needed. |
| * @throws IOException - on Failure. |
| */ |
| @SuppressWarnings("methodLength") |
| private void initializeSystemManagers(OzoneConfiguration conf, |
| SCMConfigurator configurator) throws IOException { |
| Clock clock = new MonotonicClock(ZoneOffset.UTC); |
| |
| if (configurator.getNetworkTopology() != null) { |
| clusterMap = configurator.getNetworkTopology(); |
| } else { |
| clusterMap = new NetworkTopologyImpl(conf); |
| } |
| // This needs to be done before initializing Ratis. |
| RatisDropwizardExports.registerRatisMetricReporters(ratisMetricsMap); |
| if (configurator.getSCMHAManager() != null) { |
| scmHAManager = configurator.getSCMHAManager(); |
| } else { |
| scmHAManager = new SCMHAManagerImpl(conf, this); |
| } |
| |
| scmLayoutVersionManager = new HDDSLayoutVersionManager( |
| scmStorageConfig.getLayoutVersion()); |
| |
| UpgradeFinalizationExecutor<SCMUpgradeFinalizationContext> |
| finalizationExecutor; |
| if (configurator.getUpgradeFinalizationExecutor() != null) { |
| finalizationExecutor = configurator.getUpgradeFinalizationExecutor(); |
| } else { |
| finalizationExecutor = new DefaultUpgradeFinalizationExecutor<>(); |
| } |
| finalizationManager = new FinalizationManagerImpl.Builder() |
| .setConfiguration(conf) |
| .setLayoutVersionManager(scmLayoutVersionManager) |
| .setStorage(scmStorageConfig) |
| .setHAManager(scmHAManager) |
| .setFinalizationStore(scmMetadataStore.getMetaTable()) |
| .setFinalizationExecutor(finalizationExecutor) |
| .build(); |
| |
| // inline upgrade for SequenceIdGenerator |
| SequenceIdGenerator.upgradeToSequenceId(scmMetadataStore); |
| // Distributed sequence id generator |
| sequenceIdGen = new SequenceIdGenerator( |
| conf, scmHAManager, scmMetadataStore.getSequenceIdTable()); |
| |
| if (configurator.getScmContext() != null) { |
| scmContext = configurator.getScmContext(); |
| } else { |
| // When term equals SCMContext.INVALID_TERM, the isLeader() check |
| // and getTermOfLeader() will always pass. |
| long term = SCMHAUtils.isSCMHAEnabled(conf) ? 0 : SCMContext.INVALID_TERM; |
| // non-leader of term 0, in safe mode, preCheck not completed. |
| scmContext = new SCMContext.Builder() |
| .setLeader(false) |
| .setTerm(term) |
| .setIsInSafeMode(true) |
| .setIsPreCheckComplete(false) |
| .setSCM(this) |
| .setFinalizationCheckpoint(finalizationManager.getCheckpoint()) |
| .build(); |
| } |
| |
| if (configurator.getScmNodeManager() != null) { |
| scmNodeManager = configurator.getScmNodeManager(); |
| } else { |
| scmNodeManager = new SCMNodeManager(conf, scmStorageConfig, eventQueue, |
| clusterMap, scmContext, scmLayoutVersionManager); |
| } |
| |
| placementMetrics = SCMContainerPlacementMetrics.create(); |
| containerPlacementPolicy = |
| ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager, |
| clusterMap, true, placementMetrics); |
| |
| ecContainerPlacementPolicy = ContainerPlacementPolicyFactory.getECPolicy( |
| conf, scmNodeManager, clusterMap, true, placementMetrics); |
| |
| placementPolicyValidateProxy = new PlacementPolicyValidateProxy( |
| containerPlacementPolicy, ecContainerPlacementPolicy); |
| |
| if (configurator.getPipelineManager() != null) { |
| pipelineManager = configurator.getPipelineManager(); |
| } else { |
| pipelineManager = |
| PipelineManagerImpl.newPipelineManager( |
| conf, |
| scmHAManager, |
| scmNodeManager, |
| scmMetadataStore.getPipelineTable(), |
| eventQueue, |
| scmContext, |
| serviceManager, |
| clock |
| ); |
| } |
| |
| finalizationManager.buildUpgradeContext(scmNodeManager, pipelineManager, |
| scmContext); |
| |
| containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock); |
| |
| long containerReplicaOpScrubberIntervalMs = conf.getTimeDuration( |
| ScmConfigKeys |
| .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL, |
| ScmConfigKeys |
| .OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| |
| long containerReplicaOpExpiryMs = conf.getTimeDuration( |
| ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT, |
| ScmConfigKeys.OZONE_SCM_CONTAINER_REPLICA_OP_TIME_OUT_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| |
| long backgroundServiceSafemodeWaitMs = conf.getTimeDuration( |
| HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, |
| HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| |
| final String backgroundServiceName = "ExpiredContainerReplicaOpScrubber"; |
| BackgroundSCMService expiredContainerReplicaOpScrubber = |
| new BackgroundSCMService.Builder().setClock(clock) |
| .setScmContext(scmContext) |
| .setServiceName(backgroundServiceName) |
| .setIntervalInMillis(containerReplicaOpScrubberIntervalMs) |
| .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs) |
| .setPeriodicalTask(() -> containerReplicaPendingOps |
| .removeExpiredEntries(containerReplicaOpExpiryMs)).build(); |
| |
| serviceManager.register(expiredContainerReplicaOpScrubber); |
| |
| if (configurator.getContainerManager() != null) { |
| containerManager = configurator.getContainerManager(); |
| } else { |
| containerManager = new ContainerManagerImpl(conf, scmHAManager, |
| sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable(), |
| containerReplicaPendingOps); |
| } |
| |
| pipelineChoosePolicy = PipelineChoosePolicyFactory.getPolicy(conf); |
| if (configurator.getWritableContainerFactory() != null) { |
| writableContainerFactory = configurator.getWritableContainerFactory(); |
| } else { |
| writableContainerFactory = new WritableContainerFactory(this); |
| } |
| if (configurator.getScmBlockManager() != null) { |
| scmBlockManager = configurator.getScmBlockManager(); |
| } else { |
| scmBlockManager = new BlockManagerImpl(conf, this); |
| } |
| if (configurator.getReplicationManager() != null) { |
| replicationManager = configurator.getReplicationManager(); |
| } else { |
| LegacyReplicationManager legacyRM = new LegacyReplicationManager( |
| conf, containerManager, containerPlacementPolicy, eventQueue, |
| scmContext, scmNodeManager, scmHAManager, clock, |
| getScmMetadataStore().getMoveTable()); |
| replicationManager = new ReplicationManager( |
| conf, |
| containerManager, |
| containerPlacementPolicy, |
| eventQueue, |
| scmContext, |
| scmNodeManager, |
| clock, |
| legacyRM, |
| containerReplicaPendingOps); |
| ReplicationManager.ReplicationManagerConfiguration rmConf = conf |
| .getObject(ReplicationManager.ReplicationManagerConfiguration.class); |
| |
| UnderReplicatedProcessor underReplicatedProcessor = |
| new UnderReplicatedProcessor(replicationManager, |
| containerReplicaPendingOps, eventQueue); |
| |
| BackgroundSCMService underReplicatedQueueThread = |
| new BackgroundSCMService.Builder().setClock(clock) |
| .setScmContext(scmContext) |
| .setServiceName("UnderReplicatedQueueThread") |
| .setIntervalInMillis(rmConf.getUnderReplicatedInterval()) |
| .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs) |
| .setPeriodicalTask(underReplicatedProcessor::processAll).build(); |
| serviceManager.register(underReplicatedQueueThread); |
| |
| OverReplicatedProcessor overReplicatedProcessor = |
| new OverReplicatedProcessor(replicationManager, |
| containerReplicaPendingOps, eventQueue); |
| |
| BackgroundSCMService overReplicatedQueueThread = |
| new BackgroundSCMService.Builder().setClock(clock) |
| .setScmContext(scmContext) |
| .setServiceName("OverReplicatedQueueThread") |
| .setIntervalInMillis(rmConf.getOverReplicatedInterval()) |
| .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs) |
| .setPeriodicalTask(overReplicatedProcessor::processAll).build(); |
| serviceManager.register(overReplicatedQueueThread); |
| } |
| serviceManager.register(replicationManager); |
| if (configurator.getScmSafeModeManager() != null) { |
| scmSafeModeManager = configurator.getScmSafeModeManager(); |
| } else { |
| scmSafeModeManager = new SCMSafeModeManager(conf, |
| containerManager.getContainers(), containerManager, |
| pipelineManager, eventQueue, serviceManager, scmContext); |
| } |
| |
| scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager, |
| containerManager, scmContext, eventQueue, replicationManager); |
| |
| statefulServiceStateManager = StatefulServiceStateManagerImpl.newBuilder() |
| .setStatefulServiceConfig( |
| scmMetadataStore.getStatefulServiceConfigTable()) |
| .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer()) |
| .setRatisServer(scmHAManager.getRatisServer()) |
| .build(); |
| } |
| |
| /** |
| * If security is enabled we need to have the Security Protocol and a |
| * default CA. This function initializes those values based on the |
| * configurator. |
| * |
| * @param conf - Config |
| * @param configurator - configurator |
| * @throws IOException - on Failure |
| * @throws AuthenticationException - on Failure |
| */ |
| private void initializeCAnSecurityProtocol(OzoneConfiguration conf, |
| SCMConfigurator configurator) throws IOException { |
| |
| // TODO: Support Certificate Server loading via Class Name loader. |
| // So it is easy to use different Certificate Servers if needed. |
| if (this.scmMetadataStore == null) { |
| LOG.error("Cannot initialize Certificate Server without a valid meta " + |
| "data layer."); |
| throw new SCMException("Cannot initialize CA without a valid metadata " + |
| "store", ResultCodes.SCM_NOT_INITIALIZED); |
| } |
| |
| certificateStore = |
| new SCMCertStore.Builder().setMetadaStore(scmMetadataStore) |
| .setRatisServer(scmHAManager.getRatisServer()) |
| .setCRLSequenceId(getLastSequenceIdForCRL()).build(); |
| |
| |
| final CertificateServer scmCertificateServer; |
| final CertificateServer rootCertificateServer; |
| // If primary SCM node Id is set it means this is a cluster which has |
| // performed init with SCM HA version code. |
| if (scmStorageConfig.checkPrimarySCMIdInitialized()) { |
| // Start specific instance SCM CA server. |
| String subject = SCM_SUB_CA_PREFIX + |
| InetAddress.getLocalHost().getHostName(); |
| if (configurator.getCertificateServer() != null) { |
| scmCertificateServer = configurator.getCertificateServer(); |
| } else { |
| scmCertificateServer = new DefaultCAServer(subject, |
| scmStorageConfig.getClusterID(), scmStorageConfig.getScmId(), |
| certificateStore, new DefaultProfile(), |
| scmCertificateClient.getComponentName()); |
| // INTERMEDIARY_CA which issues certs to DN and OM. |
| scmCertificateServer.init(new SecurityConfig(configuration), |
| CertificateServer.CAType.INTERMEDIARY_CA); |
| } |
| |
| if (primaryScmNodeId.equals(scmStorageConfig.getScmId())) { |
| if (configurator.getCertificateServer() != null) { |
| rootCertificateServer = configurator.getCertificateServer(); |
| } else { |
| rootCertificateServer = |
| HASecurityUtils.initializeRootCertificateServer( |
| conf, certificateStore, scmStorageConfig, new DefaultCAProfile()); |
| } |
| persistPrimarySCMCerts(); |
| } else { |
| rootCertificateServer = null; |
| } |
| } else { |
| // On a upgraded cluster primary scm nodeId will not be set as init will |
| // not be run again after upgrade. So for a upgraded cluster where init |
| // has not happened again we will have setup like before where it has |
| // one CA server which is issuing certificates to DN and OM. |
| rootCertificateServer = |
| HASecurityUtils.initializeRootCertificateServer(conf, |
| certificateStore, scmStorageConfig, new DefaultProfile()); |
| scmCertificateServer = rootCertificateServer; |
| } |
| |
| // We need to pass getCACertificate as rootCA certificate, |
| // as for SCM CA is root-CA. |
| securityProtocolServer = new SCMSecurityProtocolServer(conf, |
| rootCertificateServer, scmCertificateServer, |
| scmCertificateClient != null ? |
| scmCertificateClient.getCACertificate() : null, this); |
| |
| if (securityConfig.isContainerTokenEnabled()) { |
| containerTokenMgr = createContainerTokenSecretManager(configuration); |
| } |
| } |
| |
| /** Persist primary SCM root ca cert and sub-ca certs to DB. |
| * |
| * @throws IOException |
| */ |
| private void persistPrimarySCMCerts() throws IOException { |
| BigInteger certSerial = |
| scmCertificateClient.getCertificate().getSerialNumber(); |
| // Store the certificate in DB. On primary SCM when init happens, the |
| // certificate is not persisted to DB. As we don't have Metadatstore |
| // and ratis server initialized with statemachine. We need to do only |
| // for primary scm, for other bootstrapped scm's certificates will be |
| // persisted via ratis. |
| if (certificateStore.getCertificateByID(certSerial, |
| VALID_CERTS) == null) { |
| LOG.info("Storing sub-ca certificate serialId {} on primary SCM", |
| certSerial); |
| certificateStore.storeValidScmCertificate( |
| certSerial, scmCertificateClient.getCertificate()); |
| } |
| X509Certificate rootCACert = scmCertificateClient.getCACertificate(); |
| if (certificateStore.getCertificateByID(rootCACert.getSerialNumber(), |
| VALID_CERTS) == null) { |
| LOG.info("Storing root certificate serialId {}", |
| rootCACert.getSerialNumber()); |
| certificateStore.storeValidScmCertificate( |
| rootCACert.getSerialNumber(), rootCACert); |
| } |
| } |
| |
| public CertificateServer getRootCertificateServer() { |
| return getSecurityProtocolServer().getRootCertificateServer(); |
| } |
| |
| public CertificateServer getScmCertificateServer() { |
| return getSecurityProtocolServer().getScmCertificateServer(); |
| } |
| |
| public SCMCertificateClient getScmCertificateClient() { |
| return scmCertificateClient; |
| } |
| |
| private ContainerTokenSecretManager createContainerTokenSecretManager( |
| OzoneConfiguration conf) throws IOException { |
| |
| long expiryTime = conf.getTimeDuration( |
| HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME, |
| HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT, |
| TimeUnit.MILLISECONDS); |
| |
| // Means this is an upgraded cluster and it has no sub-ca, |
| // so SCM Certificate client is not initialized. To make Tokens |
| // work let's use root CA cert and create SCM Certificate client with |
| // root CA cert. |
| if (scmCertificateClient == null) { |
| Preconditions.checkState( |
| !scmStorageConfig.checkPrimarySCMIdInitialized()); |
| |
| String certSerialNumber; |
| try { |
| certSerialNumber = getScmCertificateServer().getCACertificate() |
| .getSerialNumber().toString(); |
| } catch (CertificateException ex) { |
| LOG.error("Get CA Certificate failed", ex); |
| throw new IOException(ex); |
| } catch (IOException ex) { |
| LOG.error("Get CA Certificate failed", ex); |
| throw ex; |
| } |
| scmCertificateClient = new SCMCertificateClient(securityConfig, |
| certSerialNumber, SCM_ROOT_CA_COMPONENT_NAME); |
| } |
| String certId = scmCertificateClient.getCertificate().getSerialNumber() |
| .toString(); |
| return new ContainerTokenSecretManager(securityConfig, |
| expiryTime, certId); |
| } |
| |
| /** |
| * Init the metadata store based on the configurator. |
| * @param conf - Config |
| * @param configurator - configurator |
| * @throws IOException - on Failure |
| */ |
| private void initalizeMetadataStore(OzoneConfiguration conf, |
| SCMConfigurator configurator) |
| throws IOException { |
| if (configurator.getMetadataStore() != null) { |
| scmMetadataStore = configurator.getMetadataStore(); |
| } else { |
| scmMetadataStore = new SCMMetadataStoreImpl(conf); |
| } |
| } |
| |
| /** |
| * Login as the configured user for SCM. |
| * |
| * @param conf |
| */ |
| private static void loginAsSCMUserIfSecurityEnabled( |
| SCMHANodeDetails scmhaNodeDetails, ConfigurationSource conf) |
| throws IOException, AuthenticationException { |
| if (OzoneSecurityUtil.isSecurityEnabled(conf)) { |
| if (LOG.isDebugEnabled()) { |
| ScmConfig scmConfig = conf.getObject(ScmConfig.class); |
| LOG.debug("Ozone security is enabled. Attempting login for SCM user. " |
| + "Principal: {}, keytab: {}", |
| scmConfig.getKerberosPrincipal(), |
| scmConfig.getKerberosKeytab()); |
| } |
| |
| Configuration hadoopConf = |
| LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); |
| if (SecurityUtil.getAuthenticationMethod(hadoopConf).equals( |
| AuthenticationMethod.KERBEROS)) { |
| UserGroupInformation.setConfiguration(hadoopConf); |
| InetSocketAddress socketAddress = getScmAddress(scmhaNodeDetails, conf); |
| SecurityUtil.login(hadoopConf, |
| ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY, |
| ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY, |
| socketAddress.getHostName()); |
| } else { |
| throw new AuthenticationException(SecurityUtil.getAuthenticationMethod( |
| hadoopConf) + " authentication method not support. " |
| + "SCM user login failed."); |
| } |
| LOG.info("SCM login successful."); |
| } |
| } |
| |
| long getLastSequenceIdForCRL() throws IOException { |
| Long sequenceId = |
| scmMetadataStore.getCRLSequenceIdTable().get(CRL_SEQUENCE_ID_KEY); |
| // If the CRL_SEQUENCE_ID_KEY does not exist in DB return 0 so that new |
| // CRL requests can have sequence id starting from 1. |
| if (sequenceId == null) { |
| return 0L; |
| } |
| // If there exists a last sequence id in the DB, the new incoming |
| // CRL requests must have sequence ids greater than the one stored in the DB |
| return sequenceId; |
| } |
| |
| /** |
| * Builds a message for logging startup information about an RPC server. |
| * |
| * @param description RPC server description |
| * @param addr RPC server listening address |
| * @return server startup message |
| */ |
| public static String buildRpcServerStartMessage(String description, |
| InetSocketAddress addr) { |
| return addr != null |
| ? String.format("%s is listening at %s", description, addr.toString()) |
| : String.format("%s not started", description); |
| } |
| |
| /** |
| * Starts an RPC server, if configured. |
| * |
| * @param conf configuration |
| * @param addr configured address of RPC server |
| * @param protocol RPC protocol provided by RPC server |
| * @param instance RPC protocol implementation instance |
| * @param handlerCount RPC server handler count |
| * @return RPC server |
| * @throws IOException if there is an I/O error while creating RPC server |
| */ |
| public static RPC.Server startRpcServer( |
| OzoneConfiguration conf, |
| InetSocketAddress addr, |
| Class<?> protocol, |
| BlockingService instance, |
| int handlerCount) |
| throws IOException { |
| RPC.Server rpcServer = |
| new RPC.Builder(conf) |
| .setProtocol(protocol) |
| .setInstance(instance) |
| .setBindAddress(addr.getHostString()) |
| .setPort(addr.getPort()) |
| .setNumHandlers(handlerCount) |
| .setVerbose(false) |
| .setSecretManager(null) |
| .build(); |
| |
| HddsServerUtil.addPBProtocol(conf, protocol, instance, rpcServer); |
| return rpcServer; |
| } |
| |
| /** |
| * Routine to bootstrap the StorageContainerManager. This will connect to a |
| * running SCM instance which has valid cluster id and fetch the cluster id |
| * from there. |
| * |
| * TODO: once SCM HA security is enabled, CSR cerificates will be fetched from |
| * running scm leader instance as well. |
| * |
| * @param conf OzoneConfiguration |
| * @return true if SCM bootstrap is successful, false otherwise. |
| * @throws IOException if init fails due to I/O error |
| */ |
| public static boolean scmBootstrap(OzoneConfiguration conf) |
| throws AuthenticationException, IOException { |
| if (!SCMHAUtils.isSCMHAEnabled(conf)) { |
| LOG.error("Bootstrap is not supported without SCM HA."); |
| return false; |
| } |
| String primordialSCM = SCMHAUtils.getPrimordialSCM(conf); |
| SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf); |
| SCMHANodeDetails scmhaNodeDetails = SCMHANodeDetails.loadSCMHAConfig(conf, |
| scmStorageConfig); |
| String selfNodeId = scmhaNodeDetails.getLocalNodeDetails().getNodeId(); |
| final String selfHostName = |
| scmhaNodeDetails.getLocalNodeDetails().getHostName(); |
| if (primordialSCM != null && SCMHAUtils.isSCMHAEnabled(conf) |
| && SCMHAUtils.isPrimordialSCM(conf, selfNodeId, selfHostName)) { |
| LOG.info( |
| "SCM bootstrap command can only be executed in non-Primordial SCM " |
| + "{}, self id {} " + "Ignoring it.", primordialSCM, selfNodeId); |
| return true; |
| } |
| final String persistedClusterId = scmStorageConfig.getClusterID(); |
| StorageState state = scmStorageConfig.getState(); |
| if (state == StorageState.INITIALIZED && conf |
| .getBoolean(ScmConfigKeys.OZONE_SCM_SKIP_BOOTSTRAP_VALIDATION_KEY, |
| ScmConfigKeys.OZONE_SCM_SKIP_BOOTSTRAP_VALIDATION_DEFAULT)) { |
| LOG.info("Skipping clusterId validation during bootstrap command. " |
| + "ClusterId id {}, SCM id {}", persistedClusterId, |
| scmStorageConfig.getScmId()); |
| |
| // Initialize security if security is enabled later. |
| initializeSecurityIfNeeded(conf, scmhaNodeDetails, scmStorageConfig); |
| |
| return true; |
| } |
| |
| loginAsSCMUserIfSecurityEnabled(scmhaNodeDetails, conf); |
| // The node here will try to fetch the cluster id from any of existing |
| // running SCM instances. |
| OzoneConfiguration config = |
| SCMHAUtils.removeSelfId(conf, |
| scmhaNodeDetails.getLocalNodeDetails().getNodeId()); |
| final ScmInfo scmInfo = HAUtils.getScmInfo(config); |
| final String fetchedId = scmInfo.getClusterId(); |
| Preconditions.checkNotNull(fetchedId); |
| if (state == StorageState.INITIALIZED) { |
| Preconditions.checkNotNull(scmStorageConfig.getScmId()); |
| if (!fetchedId.equals(persistedClusterId)) { |
| LOG.error( |
| "Could not bootstrap as SCM is already initialized with cluster " |
| + "id {} but cluster id for existing leader SCM instance " |
| + "is {}", persistedClusterId, fetchedId); |
| return false; |
| } |
| |
| // Initialize security if security is enabled later. |
| initializeSecurityIfNeeded(conf, scmhaNodeDetails, scmStorageConfig); |
| |
| } else { |
| try { |
| scmStorageConfig.setClusterId(fetchedId); |
| // It will write down the cluster Id fetched from already |
| // running SCM as well as the local SCM Id. |
| |
| // SCM Node info containing hostname to scm Id mappings |
| // will be persisted into the version file once this node gets added |
| // to existing SCM ring post node regular start up. |
| |
| if (OzoneSecurityUtil.isSecurityEnabled(conf)) { |
| HASecurityUtils.initializeSecurity(scmStorageConfig, config, |
| getScmAddress(scmhaNodeDetails, conf), false); |
| } |
| scmStorageConfig.setPrimaryScmNodeId(scmInfo.getScmId()); |
| scmStorageConfig.setSCMHAFlag(true); |
| scmStorageConfig.initialize(); |
| LOG.info("SCM BootStrap is successful for ClusterID {}, SCMID {}", |
| scmInfo.getClusterId(), scmStorageConfig.getScmId()); |
| LOG.info("Primary SCM Node ID {}", |
| scmStorageConfig.getPrimaryScmNodeId()); |
| } catch (IOException ioe) { |
| LOG.error("Could not initialize SCM version file", ioe); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Initialize security If Ozone security is enabled and |
| * ScmStorageConfig does not have certificate serial id. |
| * @param conf |
| * @param scmhaNodeDetails |
| * @param scmStorageConfig |
| * @throws IOException |
| */ |
| private static void initializeSecurityIfNeeded(OzoneConfiguration conf, |
| SCMHANodeDetails scmhaNodeDetails, SCMStorageConfig scmStorageConfig) |
| throws IOException { |
| // Initialize security if security is enabled later. |
| if (OzoneSecurityUtil.isSecurityEnabled(conf) |
| && scmStorageConfig.getScmCertSerialId() == null) { |
| HASecurityUtils.initializeSecurity(scmStorageConfig, conf, |
| getScmAddress(scmhaNodeDetails, conf), true); |
| scmStorageConfig.forceInitialize(); |
| LOG.info("SCM unsecure cluster is converted to secure cluster. " + |
| "Persisted SCM Certificate SerialID {}", |
| scmStorageConfig.getScmCertSerialId()); |
| } |
| } |
| |
| /** |
| * Routine to set up the Version info for StorageContainerManager. |
| * |
| * @param conf OzoneConfiguration |
| * @return true if SCM initialization is successful, false otherwise. |
| * @throws IOException if init fails due to I/O error |
| */ |
| public static boolean scmInit(OzoneConfiguration conf, |
| String clusterId) throws IOException { |
| SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf); |
| StorageState state = scmStorageConfig.getState(); |
| final SCMHANodeDetails haDetails = SCMHANodeDetails.loadSCMHAConfig(conf, |
| scmStorageConfig); |
| String primordialSCM = SCMHAUtils.getPrimordialSCM(conf); |
| final String selfNodeId = haDetails.getLocalNodeDetails().getNodeId(); |
| final String selfHostName = haDetails.getLocalNodeDetails().getHostName(); |
| if (primordialSCM != null && SCMHAUtils.isSCMHAEnabled(conf) |
| && !SCMHAUtils.isPrimordialSCM(conf, selfNodeId, selfHostName)) { |
| LOG.info( |
| "SCM init command can only be executed in Primordial SCM {}, " |
| + "self id {} " |
| + "Ignoring it.", primordialSCM, selfNodeId); |
| return true; |
| } |
| if (state != StorageState.INITIALIZED) { |
| try { |
| if (clusterId != null && !clusterId.isEmpty()) { |
| // clusterId must be an UUID |
| Preconditions.checkNotNull(UUID.fromString(clusterId)); |
| scmStorageConfig.setClusterId(clusterId); |
| } |
| |
| |
| if (OzoneSecurityUtil.isSecurityEnabled(conf)) { |
| HASecurityUtils.initializeSecurity(scmStorageConfig, conf, |
| getScmAddress(haDetails, conf), true); |
| } |
| |
| // Ensure scmRatisServer#initialize() is called post scm storage |
| // config initialization.. If SCM version file is created, |
| // the subsequent scm init should use the clusterID from version file. |
| // So, scmStorageConfig#initialize() should happen before ratis server |
| // initialize. In this way,we do not leave ratis storage directory |
| // with multiple raft group directories in failure scenario. |
| |
| // The order of init should be |
| // 1. SCM storage config initialize to create version file. |
| // 2. Initialize Ratis server. |
| |
| scmStorageConfig.setPrimaryScmNodeId(scmStorageConfig.getScmId()); |
| scmStorageConfig.initialize(); |
| |
| if (SCMHAUtils.isSCMHAEnabled(conf)) { |
| SCMRatisServerImpl.initialize(scmStorageConfig.getClusterID(), |
| scmStorageConfig.getScmId(), haDetails.getLocalNodeDetails(), |
| conf); |
| scmStorageConfig = new SCMStorageConfig(conf); |
| scmStorageConfig.setSCMHAFlag(true); |
| // Do force initialize to persist SCM_HA flag. |
| scmStorageConfig.forceInitialize(); |
| } |
| |
| LOG.info("SCM initialization succeeded. Current cluster id for sd={}" |
| + "; cid={}; layoutVersion={}; scmId={}", |
| scmStorageConfig.getStorageDir(), scmStorageConfig.getClusterID(), |
| scmStorageConfig.getLayoutVersion(), scmStorageConfig.getScmId()); |
| return true; |
| } catch (IOException ioe) { |
| LOG.error("Could not initialize SCM version file", ioe); |
| return false; |
| } |
| } else { |
| // If SCM HA was not being used before pre-finalize, and is being used |
| // when the cluster is pre-finalized for the SCM HA feature, init |
| // should fail. |
| ScmHAUnfinalizedStateValidationAction.checkScmHA(conf, scmStorageConfig, |
| new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion())); |
| |
| clusterId = scmStorageConfig.getClusterID(); |
| final boolean isSCMHAEnabled = scmStorageConfig.isSCMHAEnabled(); |
| |
| // Initialize security if security is enabled later. |
| initializeSecurityIfNeeded(conf, haDetails, scmStorageConfig); |
| |
| if (SCMHAUtils.isSCMHAEnabled(conf) && !isSCMHAEnabled) { |
| SCMRatisServerImpl.initialize(scmStorageConfig.getClusterID(), |
| scmStorageConfig.getScmId(), haDetails.getLocalNodeDetails(), |
| conf); |
| scmStorageConfig.setSCMHAFlag(true); |
| scmStorageConfig.setPrimaryScmNodeId(scmStorageConfig.getScmId()); |
| scmStorageConfig.forceInitialize(); |
| LOG.debug("Enabled SCM HA"); |
| } |
| |
| LOG.info("SCM already initialized. Reusing existing cluster id for sd={}" |
| + ";cid={}; layoutVersion={}; HAEnabled={}", |
| scmStorageConfig.getStorageDir(), clusterId, |
| scmStorageConfig.getLayoutVersion(), |
| scmStorageConfig.isSCMHAEnabled()); |
| return true; |
| } |
| } |
| |
| private static InetSocketAddress getScmAddress(SCMHANodeDetails haDetails, |
| ConfigurationSource conf) throws IOException { |
| List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo( |
| conf); |
| Preconditions.checkNotNull(scmNodeInfoList, "scmNodeInfoList is null"); |
| |
| InetSocketAddress scmAddress = null; |
| if (SCMHAUtils.getScmServiceId(conf) != null) { |
| for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { |
| if (haDetails.getLocalNodeDetails().getNodeId() != null |
| && haDetails.getLocalNodeDetails().getNodeId().equals( |
| scmNodeInfo.getNodeId())) { |
| scmAddress = |
| NetUtils.createSocketAddr(scmNodeInfo.getBlockClientAddress()); |
| } |
| } |
| } else { |
| // Get Local host and use scm client port |
| if (scmNodeInfoList.get(0).getBlockClientAddress() == null) { |
| LOG.error("SCM Address not able to figure out from config, finding " + |
| "hostname from InetAddress."); |
| scmAddress = |
| NetUtils.createSocketAddr(InetAddress.getLocalHost().getHostName(), |
| ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT); |
| } else { |
| scmAddress = NetUtils.createSocketAddr( |
| scmNodeInfoList.get(0).getBlockClientAddress()); |
| } |
| } |
| |
| |
| return scmAddress; |
| } |
| |
| /** |
| * Initialize SCM metrics. |
| */ |
| public static void initMetrics() { |
| metrics = SCMMetrics.create(); |
| } |
| |
| /** |
| * Return SCM metrics instance. |
| */ |
| public static SCMMetrics getMetrics() { |
| return metrics == null ? SCMMetrics.create() : metrics; |
| } |
| |
| public SCMStorageConfig getScmStorageConfig() { |
| return scmStorageConfig; |
| } |
| |
| public SCMDatanodeProtocolServer getDatanodeProtocolServer() { |
| return datanodeProtocolServer; |
| } |
| |
| public SCMBlockProtocolServer getBlockProtocolServer() { |
| return blockProtocolServer; |
| } |
| |
| public SCMClientProtocolServer getClientProtocolServer() { |
| return clientProtocolServer; |
| } |
| |
| public SCMSecurityProtocolServer getSecurityProtocolServer() { |
| return securityProtocolServer; |
| } |
| |
| /** |
| * Initialize container reports cache that sent from datanodes. |
| */ |
| @SuppressWarnings("UnstableApiUsage") |
| private Cache<String, ContainerStat> buildContainerReportCache() { |
| return |
| CacheBuilder.newBuilder() |
| .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS) |
| .maximumSize(Integer.MAX_VALUE) |
| .removalListener(( |
| RemovalListener<String, ContainerStat>) removalNotification -> { |
| synchronized (containerReportCache) { |
| ContainerStat stat = removalNotification.getValue(); |
| if (stat != null) { |
| // TODO: Are we doing the right thing here? |
| // remove invalid container report |
| metrics.decrContainerStat(stat); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Remove expired container stat entry for " + |
| "datanode: {}.", removalNotification.getKey()); |
| } |
| } |
| }) |
| .build(); |
| } |
| |
| private void registerMXBean() { |
| final Map<String, String> jmxProperties = new HashMap<>(); |
| jmxProperties.put("component", "ServerRuntime"); |
| this.scmInfoBeanName = HddsUtils.registerWithJmxProperties( |
| "StorageContainerManager", "StorageContainerManagerInfo", |
| jmxProperties, this); |
| } |
| |
| private void registerMetricsSource(SCMMXBean scmMBean) { |
| scmContainerMetrics = SCMContainerMetrics.create(scmMBean); |
| } |
| |
| private void unregisterMXBean() { |
| if (this.scmInfoBeanName != null) { |
| MBeans.unregister(this.scmInfoBeanName); |
| this.scmInfoBeanName = null; |
| } |
| } |
| |
| @VisibleForTesting |
| public ContainerInfo getContainerInfo(long containerID) throws |
| IOException { |
| return containerManager.getContainer(ContainerID.valueOf(containerID)); |
| } |
| |
| /** |
| * Returns listening address of StorageLocation Protocol RPC server. |
| * |
| * @return listen address of StorageLocation RPC server |
| */ |
| @VisibleForTesting |
| public InetSocketAddress getClientRpcAddress() { |
| return getClientProtocolServer().getClientRpcAddress(); |
| } |
| |
| @Override |
| public String getClientRpcPort() { |
| InetSocketAddress addr = getClientRpcAddress(); |
| return addr == null ? "0" : Integer.toString(addr.getPort()); |
| } |
| |
| public String getBlockProtocolRpcPort() { |
| InetSocketAddress addr = getBlockProtocolServer().getBlockRpcAddress(); |
| return addr == null ? "0" : Integer.toString(addr.getPort()); |
| } |
| |
| public String getSecurityProtocolRpcPort() { |
| InetSocketAddress addr = getSecurityProtocolServer().getRpcAddress(); |
| return addr == null ? "0" : Integer.toString(addr.getPort()); |
| } |
| |
| /** |
| * Returns listening address of StorageDatanode Protocol RPC server. |
| * |
| * @return Address where datanode are communicating. |
| */ |
| @Override |
| public InetSocketAddress getDatanodeRpcAddress() { |
| return getDatanodeProtocolServer().getDatanodeRpcAddress(); |
| } |
| |
| @Override |
| public SCMNodeDetails getScmNodeDetails() { |
| return scmHANodeDetails.getLocalNodeDetails(); |
| } |
| |
| public SCMHANodeDetails getSCMHANodeDetails() { |
| return scmHANodeDetails; |
| } |
| |
| @Override |
| public String getDatanodeRpcPort() { |
| InetSocketAddress addr = getDatanodeRpcAddress(); |
| return addr == null ? "0" : Integer.toString(addr.getPort()); |
| } |
| |
| /** |
| * Start service. |
| */ |
| @Override |
| public void start() throws IOException { |
| finalizationManager.runPrefinalizeStateActions(); |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info(buildRpcServerStartMessage( |
| "StorageContainerLocationProtocol RPC server", |
| getClientRpcAddress())); |
| } |
| |
| scmHAManager.start(); |
| startSecretManagerIfNecessary(); |
| |
| ms = HddsServerUtil |
| .initializeMetrics(configuration, "StorageContainerManager"); |
| |
| commandWatcherLeaseManager.start(); |
| getClientProtocolServer().start(); |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info(buildRpcServerStartMessage("ScmBlockLocationProtocol RPC " + |
| "server", getBlockProtocolServer().getBlockRpcAddress())); |
| } |
| getBlockProtocolServer().start(); |
| |
| // If HA is enabled, start datanode protocol server once leader is ready. |
| if (!scmStorageConfig.isSCMHAEnabled()) { |
| getDatanodeProtocolServer().start(); |
| } |
| if (getSecurityProtocolServer() != null) { |
| getSecurityProtocolServer().start(); |
| persistSCMCertificates(); |
| } |
| |
| scmBlockManager.start(); |
| |
| // Start jvm monitor |
| jvmPauseMonitor = new JvmPauseMonitor(); |
| jvmPauseMonitor.init(configuration); |
| jvmPauseMonitor.start(); |
| |
| try { |
| httpServer = new StorageContainerManagerHttpServer(configuration, this); |
| httpServer.start(); |
| } catch (Exception ex) { |
| // SCM HttpServer start-up failure should be non-fatal |
| LOG.error("SCM HttpServer failed to start.", ex); |
| } |
| |
| setStartTime(); |
| } |
| |
| /** Persist SCM certs to DB on bootstrap scm nodes. |
| * |
| * @throws IOException |
| */ |
| private void persistSCMCertificates() throws IOException { |
| // Fetch all CA's and persist during startup on bootstrap nodes. This |
| // is primarily being done to persist primary SCM Cert and Root CA. |
| // TODO: see if we can avoid doing this during every restart. |
| if (primaryScmNodeId != null && !primaryScmNodeId.equals( |
| scmStorageConfig.getScmId())) { |
| List<String> pemEncodedCerts = |
| scmCertificateClient.listCA(); |
| |
| // Write the primary SCM CA and Root CA during startup. |
| for (String cert : pemEncodedCerts) { |
| try { |
| X509Certificate x509Certificate = |
| CertificateCodec.getX509Certificate(cert); |
| if (certificateStore.getCertificateByID( |
| x509Certificate.getSerialNumber(), VALID_CERTS) == null) { |
| LOG.info("Persist certificate serialId {} on Scm Bootstrap Node " + |
| "{}", x509Certificate.getSerialNumber(), |
| scmStorageConfig.getScmId()); |
| certificateStore.storeValidScmCertificate( |
| x509Certificate.getSerialNumber(), x509Certificate); |
| } |
| } catch (CertificateException ex) { |
| LOG.error("Error while decoding CA Certificate", ex); |
| throw new IOException(ex); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Stop service. |
| */ |
| @Override |
| public void stop() { |
| try { |
| if (containerBalancer.isBalancerRunning()) { |
| LOG.info("Stopping Container Balancer service."); |
| // stop ContainerBalancer thread in this scm |
| containerBalancer.stop(); |
| } else { |
| LOG.info("Container Balancer is not running."); |
| } |
| } catch (Exception e) { |
| LOG.error("Failed to stop Container Balancer service.", e); |
| } |
| |
| try { |
| LOG.info("Stopping Replication Manager Service."); |
| replicationManager.stop(); |
| } catch (Exception ex) { |
| LOG.error("Replication manager service stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping the Datanode Admin Monitor."); |
| scmDecommissionManager.stop(); |
| } catch (Exception ex) { |
| LOG.error("The Datanode Admin Monitor failed to stop", ex); |
| } |
| |
| try { |
| LOG.info("Stopping Lease Manager of the command watchers"); |
| commandWatcherLeaseManager.shutdown(); |
| } catch (Exception ex) { |
| LOG.error("Lease Manager of the command watchers stop failed"); |
| } |
| |
| try { |
| LOG.info("Stopping datanode service RPC server"); |
| getDatanodeProtocolServer().stop(); |
| |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager datanode RPC stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping block service RPC server"); |
| getBlockProtocolServer().stop(); |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager blockRpcServer stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping the StorageContainerLocationProtocol RPC server"); |
| getClientProtocolServer().stop(); |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager clientRpcServer stop failed.", ex); |
| } |
| |
| try { |
| LOG.info("Stopping Storage Container Manager HTTP server."); |
| httpServer.stop(); |
| } catch (Exception ex) { |
| LOG.error("Storage Container Manager HTTP server stop failed.", ex); |
| } |
| |
| LOG.info("Stopping SCM LayoutVersionManager Service."); |
| scmLayoutVersionManager.close(); |
| |
| if (getSecurityProtocolServer() != null) { |
| getSecurityProtocolServer().stop(); |
| } |
| |
| try { |
| LOG.info("Stopping Block Manager Service."); |
| scmBlockManager.stop(); |
| } catch (Exception ex) { |
| LOG.error("SCM block manager service stop failed.", ex); |
| } |
| |
| if (containerReportCache != null) { |
| containerReportCache.invalidateAll(); |
| containerReportCache.cleanUp(); |
| } |
| |
| stopSecretManager(); |
| |
| if (metrics != null) { |
| metrics.unRegister(); |
| } |
| |
| unregisterMXBean(); |
| if (scmContainerMetrics != null) { |
| scmContainerMetrics.unRegister(); |
| } |
| if (placementMetrics != null) { |
| placementMetrics.unRegister(); |
| } |
| |
| // Event queue must be stopped before the DB store is closed at the end. |
| try { |
| LOG.info("Stopping SCM Event Queue."); |
| eventQueue.close(); |
| } catch (Exception ex) { |
| LOG.error("SCM Event Queue stop failed", ex); |
| } |
| |
| if (jvmPauseMonitor != null) { |
| jvmPauseMonitor.stop(); |
| } |
| |
| try { |
| LOG.info("Stopping SCM HA services."); |
| scmHAManager.stop(); |
| } catch (Exception ex) { |
| LOG.error("SCM HA Manager stop failed", ex); |
| } |
| |
| IOUtils.cleanupWithLogger(LOG, containerManager); |
| IOUtils.cleanupWithLogger(LOG, pipelineManager); |
| |
| try { |
| LOG.info("Stopping SCM MetadataStore."); |
| scmMetadataStore.stop(); |
| } catch (Exception ex) { |
| LOG.error("SCM Metadata store stop failed", ex); |
| } |
| |
| if (ms != null) { |
| ms.stop(); |
| } |
| |
| scmSafeModeManager.stop(); |
| } |
| |
| @Override |
| public void shutDown(String message) { |
| stop(); |
| ExitUtils.terminate(1, message, LOG); |
| } |
| |
| /** |
| * Wait until service has completed shutdown. |
| */ |
| @Override |
| public void join() { |
| try { |
| getBlockProtocolServer().join(); |
| getClientProtocolServer().join(); |
| getDatanodeProtocolServer().join(); |
| if (getSecurityProtocolServer() != null) { |
| getSecurityProtocolServer().join(); |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| LOG.info("Interrupted during StorageContainerManager join."); |
| } |
| } |
| |
| /** |
| * Returns the Number of Datanodes that are communicating with SCM. |
| * |
| * @param nodestate Healthy, Dead etc. |
| * @return int -- count |
| */ |
| public int getNodeCount(NodeState nodestate) { |
| // TODO - decomm - this probably needs to accept opState and health |
| return scmNodeManager.getNodeCount(null, nodestate); |
| } |
| |
| /** |
| * Returns the node decommission manager. |
| * |
| * @return NodeDecommissionManager The decommission manger for the used by |
| * scm |
| */ |
| public NodeDecommissionManager getScmDecommissionManager() { |
| return scmDecommissionManager; |
| } |
| |
| /** |
| * Returns SCMHAManager. |
| */ |
| public SCMHAManager getScmHAManager() { |
| return scmHAManager; |
| } |
| |
| /** |
| * Returns the Writable Container Factory. |
| * |
| * @return The WritableContainerFactory instance used by SCM. |
| */ |
| public WritableContainerFactory getWritableContainerFactory() { |
| return writableContainerFactory; |
| } |
| |
| /** |
| * Returns SCM container manager. |
| */ |
| @VisibleForTesting |
| @Override |
| public ContainerManager getContainerManager() { |
| return containerManager; |
| } |
| |
| /** |
| * Returns node manager. |
| * |
| * @return - Node Manager |
| */ |
| @VisibleForTesting |
| @Override |
| public NodeManager getScmNodeManager() { |
| return scmNodeManager; |
| } |
| |
| /** |
| * Returns pipeline manager. |
| * |
| * @return - Pipeline Manager |
| */ |
| @VisibleForTesting |
| @Override |
| public PipelineManager getPipelineManager() { |
| return pipelineManager; |
| } |
| |
| @VisibleForTesting |
| @Override |
| public BlockManager getScmBlockManager() { |
| return scmBlockManager; |
| } |
| |
| @VisibleForTesting |
| public SCMSafeModeManager getScmSafeModeManager() { |
| return scmSafeModeManager; |
| } |
| |
| @Override |
| public ReplicationManager getReplicationManager() { |
| return replicationManager; |
| } |
| |
| public PlacementPolicy getContainerPlacementPolicy() { |
| return containerPlacementPolicy; |
| } |
| |
| public PlacementPolicyValidateProxy getPlacementPolicyValidateProxy() { |
| return placementPolicyValidateProxy; |
| } |
| |
| @VisibleForTesting |
| @Override |
| public ContainerBalancer getContainerBalancer() { |
| return containerBalancer; |
| } |
| |
| /** |
| * Check if the current scm is the leader and ready for accepting requests. |
| * @return - if the current scm is the leader and is ready. |
| */ |
| public boolean checkLeader() { |
| // For NON-HA setup, the node will always be the leader |
| if (!SCMHAUtils.isSCMHAEnabled(configuration)) { |
| return true; |
| } else { |
| // FOR HA setup, the node has to be the leader and ready to serve |
| // requests. |
| return getScmHAManager().getRatisServer().getDivision().getInfo() |
| .isLeaderReady(); |
| } |
| } |
| |
| public void checkAdminAccess(UserGroupInformation remoteUser) |
| throws IOException { |
| if (remoteUser != null |
| && !scmAdminUsernames.contains(remoteUser.getUserName()) && |
| !scmAdminUsernames.contains(remoteUser.getShortUserName()) && |
| !scmAdminUsernames.contains(OZONE_ADMINISTRATORS_WILDCARD)) { |
| throw new AccessControlException( |
| "Access denied for user " + remoteUser.getUserName() + |
| ". Superuser privilege is required."); |
| } |
| } |
| |
| /** |
| * Invalidate container stat entry for given datanode. |
| * |
| * @param datanodeUuid |
| */ |
| public void removeContainerReport(String datanodeUuid) { |
| synchronized (containerReportCache) { |
| containerReportCache.invalidate(datanodeUuid); |
| } |
| } |
| |
| /** |
| * Get container stat of specified datanode. |
| * |
| * @param datanodeUuid |
| * @return |
| */ |
| public ContainerStat getContainerReport(String datanodeUuid) { |
| ContainerStat stat = null; |
| synchronized (containerReportCache) { |
| stat = containerReportCache.getIfPresent(datanodeUuid); |
| } |
| |
| return stat; |
| } |
| |
| /** |
| * Returns a view of the container stat entries. Modifications made to the |
| * map will directly |
| * affect the cache. |
| * |
| * @return |
| */ |
| public ConcurrentMap<String, ContainerStat> getContainerReportCache() { |
| return containerReportCache.asMap(); |
| } |
| |
| @Override |
| public Map<String, String> getContainerReport() { |
| Map<String, String> id2StatMap = new HashMap<>(); |
| synchronized (containerReportCache) { |
| ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap(); |
| for (Map.Entry<String, ContainerStat> entry : map.entrySet()) { |
| id2StatMap.put(entry.getKey(), entry.getValue().toJsonString()); |
| } |
| } |
| |
| return id2StatMap; |
| } |
| |
| /** |
| * Returns live safe mode container threshold. |
| * |
| * @return String |
| */ |
| @Override |
| public double getSafeModeCurrentContainerThreshold() { |
| return getCurrentContainerThreshold(); |
| } |
| |
| /** |
| * Returns safe mode status. |
| * @return boolean |
| */ |
| @Override |
| public boolean isInSafeMode() { |
| return scmSafeModeManager.getInSafeMode(); |
| } |
| |
| /** |
| * Returns EventPublisher. |
| */ |
| public EventPublisher getEventQueue() { |
| return eventQueue; |
| } |
| |
| /** |
| * Returns SCMContext. |
| */ |
| public SCMContext getScmContext() { |
| return scmContext; |
| } |
| |
| /** |
| * Returns SequenceIdGen. |
| */ |
| public SequenceIdGenerator getSequenceIdGen() { |
| return sequenceIdGen; |
| } |
| |
| /** |
| * Returns SCMServiceManager. |
| */ |
| public SCMServiceManager getSCMServiceManager() { |
| return serviceManager; |
| } |
| |
| /** |
| * Force SCM out of safe mode. |
| */ |
| public boolean exitSafeMode() { |
| scmSafeModeManager.exitSafeMode(eventQueue); |
| return true; |
| } |
| |
| @VisibleForTesting |
| public double getCurrentContainerThreshold() { |
| return scmSafeModeManager.getCurrentContainerThreshold(); |
| } |
| |
| @Override |
| public Map<String, Integer> getContainerStateCount() { |
| Map<String, Integer> nodeStateCount = new HashMap<>(); |
| for (HddsProtos.LifeCycleState state : HddsProtos.LifeCycleState.values()) { |
| nodeStateCount.put(state.toString(), |
| containerManager.getContainerStateCount(state)); |
| } |
| return nodeStateCount; |
| } |
| |
| /** |
| * Returns the SCM metadata Store. |
| * @return SCMMetadataStore |
| */ |
| public SCMMetadataStore getScmMetadataStore() { |
| return scmMetadataStore; |
| } |
| |
| /** |
| * Returns the SCM network topology cluster. |
| * @return NetworkTopology |
| */ |
| public NetworkTopology getClusterMap() { |
| return this.clusterMap; |
| } |
| |
| public StatefulServiceStateManager getStatefulServiceStateManager() { |
| return statefulServiceStateManager; |
| } |
| |
| /** |
| * Get the safe mode status of all rules. |
| * |
| * @return map of rule statuses. |
| */ |
| public Map<String, Pair<Boolean, String>> getRuleStatus() { |
| return scmSafeModeManager.getRuleStatus(); |
| } |
| |
| @Override |
| public Map<String, String[]> getSafeModeRuleStatus() { |
| Map<String, String[]> map = new HashMap<>(); |
| for (Map.Entry<String, Pair<Boolean, String>> entry : |
| scmSafeModeManager.getRuleStatus().entrySet()) { |
| String[] status = |
| {entry.getValue().getRight(), entry.getValue().getLeft().toString()}; |
| map.put(entry.getKey(), status); |
| } |
| return map; |
| } |
| |
| public PipelineChoosePolicy getPipelineChoosePolicy() { |
| return this.pipelineChoosePolicy; |
| } |
| |
| @Override |
| public String getScmId() { |
| return getScmStorageConfig().getScmId(); |
| } |
| |
| @Override |
| public String getClusterId() { |
| return getScmStorageConfig().getClusterID(); |
| } |
| |
| public HDDSLayoutVersionManager getLayoutVersionManager() { |
| return scmLayoutVersionManager; |
| } |
| |
| public FinalizationManager getFinalizationManager() { |
| return finalizationManager; |
| } |
| |
| /** |
| * Return the node Id of this SCM. |
| * @return node Id. |
| */ |
| public String getSCMNodeId() { |
| return scmHANodeDetails.getLocalNodeDetails().getNodeId(); |
| } |
| |
| private void startSecretManagerIfNecessary() { |
| boolean shouldRun = securityConfig.isSecurityEnabled() |
| && securityConfig.isContainerTokenEnabled() |
| && containerTokenMgr != null; |
| if (shouldRun) { |
| boolean running = containerTokenMgr.isRunning(); |
| if (!running) { |
| startSecretManager(); |
| } |
| } |
| } |
| |
| private void startSecretManager() { |
| try { |
| scmCertificateClient.assertValidKeysAndCertificate(); |
| } catch (OzoneSecurityException e) { |
| LOG.error("Unable to read key pair.", e); |
| throw new UncheckedIOException(e); |
| } |
| try { |
| LOG.info("Starting token manager"); |
| containerTokenMgr.start(scmCertificateClient); |
| } catch (IOException e) { |
| // Unable to start secret manager. |
| LOG.error("Error starting block token secret manager.", e); |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| private void stopSecretManager() { |
| if (containerTokenMgr != null) { |
| LOG.info("Stopping block token manager."); |
| try { |
| containerTokenMgr.stop(); |
| } catch (IOException e) { |
| LOG.error("Failed to stop block token manager", e); |
| } |
| } |
| } |
| |
| public ContainerTokenGenerator getContainerTokenGenerator() { |
| return containerTokenMgr != null |
| ? containerTokenMgr |
| : ContainerTokenGenerator.DISABLED; |
| } |
| |
| @Override |
| public String getScmRatisRoles() throws IOException { |
| final SCMRatisServer server = getScmHAManager().getRatisServer(); |
| return server != null ? |
| HddsUtils.format(server.getRatisRoles()) : "STANDALONE"; |
| } |
| |
| /** |
| * @return hostname of primordialNode |
| */ |
| @Override |
| public String getPrimordialNode() { |
| if (SCMHAUtils.isSCMHAEnabled(configuration)) { |
| String primordialNode = SCMHAUtils.getPrimordialSCM(configuration); |
| // primordialNode can be nodeId too . If it is then return hostname. |
| if (SCMHAUtils.getSCMNodeIds(configuration).contains(primordialNode)) { |
| List<SCMNodeDetails> localAndPeerNodes = |
| new ArrayList<>(scmHANodeDetails.getPeerNodeDetails()); |
| localAndPeerNodes.add(getSCMHANodeDetails().getLocalNodeDetails()); |
| for (SCMNodeDetails nodes : localAndPeerNodes) { |
| if (nodes.getNodeId().equals(primordialNode)) { |
| return nodes.getHostName(); |
| } |
| } |
| |
| } |
| return primordialNode; |
| } |
| return null; |
| } |
| |
| @Override |
| public String getRatisLogDirectory() { |
| return SCMHAUtils.getSCMRatisDirectory(configuration); |
| } |
| |
| @Override |
| public String getRocksDbDirectory() { |
| return String.valueOf(ServerUtils.getScmDbDir(configuration)); |
| } |
| |
| } |