| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.app; |
| |
| import static java.util.concurrent.CompletableFuture.completedFuture; |
| |
| import com.typesafe.config.Config; |
| import com.typesafe.config.ConfigFactory; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.function.BiPredicate; |
| import java.util.function.Consumer; |
| import java.util.function.LongFunction; |
| import java.util.function.LongSupplier; |
| import java.util.function.Supplier; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgnitionManager; |
| import org.apache.ignite.client.handler.ClientHandlerMetricSource; |
| import org.apache.ignite.client.handler.ClientHandlerModule; |
| import org.apache.ignite.compute.IgniteCompute; |
| import org.apache.ignite.configuration.ConfigurationDynamicDefaultsPatcher; |
| import org.apache.ignite.configuration.ConfigurationModule; |
| import org.apache.ignite.internal.catalog.CatalogManager; |
| import org.apache.ignite.internal.catalog.CatalogManagerImpl; |
| import org.apache.ignite.internal.catalog.ClockWaiter; |
| import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration; |
| import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; |
| import org.apache.ignite.internal.cluster.management.ClusterInitializer; |
| import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; |
| import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; |
| import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; |
| import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; |
| import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage; |
| import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage; |
| import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; |
| import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; |
| import org.apache.ignite.internal.component.RestAddressReporter; |
| import org.apache.ignite.internal.components.LongJvmPauseDetector; |
| import org.apache.ignite.internal.compute.ComputeComponent; |
| import org.apache.ignite.internal.compute.ComputeComponentImpl; |
| import org.apache.ignite.internal.compute.IgniteComputeImpl; |
| import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; |
| import org.apache.ignite.internal.compute.loader.JobClassLoaderFactory; |
| import org.apache.ignite.internal.compute.loader.JobContextManager; |
| import org.apache.ignite.internal.compute.queue.ComputeExecutorImpl; |
| import org.apache.ignite.internal.configuration.ConfigurationDynamicDefaultsPatcherImpl; |
| import org.apache.ignite.internal.configuration.ConfigurationManager; |
| import org.apache.ignite.internal.configuration.ConfigurationModules; |
| import org.apache.ignite.internal.configuration.ConfigurationRegistry; |
| import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; |
| import org.apache.ignite.internal.configuration.JdbcPortProviderImpl; |
| import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider; |
| import org.apache.ignite.internal.configuration.hocon.HoconConverter; |
| import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; |
| import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage; |
| import org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage; |
| import org.apache.ignite.internal.configuration.tree.ConfigurationSource; |
| import org.apache.ignite.internal.configuration.validation.ConfigurationValidator; |
| import org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl; |
| import org.apache.ignite.internal.deployunit.DeploymentManagerImpl; |
| import org.apache.ignite.internal.deployunit.IgniteDeployment; |
| import org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration; |
| import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl; |
| import org.apache.ignite.internal.distributionzones.DistributionZoneManager; |
| import org.apache.ignite.internal.hlc.HybridClock; |
| import org.apache.ignite.internal.hlc.HybridClockImpl; |
| import org.apache.ignite.internal.index.IndexBuildingManager; |
| import org.apache.ignite.internal.index.IndexManager; |
| import org.apache.ignite.internal.lang.IgniteInternalException; |
| import org.apache.ignite.internal.lang.NodeStoppingException; |
| import org.apache.ignite.internal.logger.IgniteLogger; |
| import org.apache.ignite.internal.logger.Loggers; |
| import org.apache.ignite.internal.metastorage.MetaStorageManager; |
| import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; |
| import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; |
| import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; |
| import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; |
| import org.apache.ignite.internal.metrics.MetricManager; |
| import org.apache.ignite.internal.metrics.configuration.MetricConfiguration; |
| import org.apache.ignite.internal.metrics.sources.JvmMetricSource; |
| import org.apache.ignite.internal.network.configuration.NetworkConfiguration; |
| import org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema; |
| import org.apache.ignite.internal.network.recovery.VaultStateIds; |
| import org.apache.ignite.internal.placementdriver.PlacementDriver; |
| import org.apache.ignite.internal.placementdriver.PlacementDriverManager; |
| import org.apache.ignite.internal.raft.Loza; |
| import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; |
| import org.apache.ignite.internal.raft.configuration.RaftConfiguration; |
| import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator; |
| import org.apache.ignite.internal.replicator.ReplicaManager; |
| import org.apache.ignite.internal.replicator.ReplicaService; |
| import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; |
| import org.apache.ignite.internal.rest.RestComponent; |
| import org.apache.ignite.internal.rest.RestFactory; |
| import org.apache.ignite.internal.rest.authentication.AuthenticationProviderFactory; |
| import org.apache.ignite.internal.rest.cluster.ClusterManagementRestFactory; |
| import org.apache.ignite.internal.rest.configuration.PresentationsFactory; |
| import org.apache.ignite.internal.rest.configuration.RestConfiguration; |
| import org.apache.ignite.internal.rest.deployment.CodeDeploymentRestFactory; |
| import org.apache.ignite.internal.rest.metrics.MetricRestFactory; |
| import org.apache.ignite.internal.rest.node.NodeManagementRestFactory; |
| import org.apache.ignite.internal.schema.SchemaManager; |
| import org.apache.ignite.internal.schema.configuration.GcConfiguration; |
| import org.apache.ignite.internal.security.authentication.AuthenticationManager; |
| import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl; |
| import org.apache.ignite.internal.security.configuration.SecurityConfiguration; |
| import org.apache.ignite.internal.sql.api.IgniteSqlImpl; |
| import org.apache.ignite.internal.sql.engine.QueryProcessor; |
| import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; |
| import org.apache.ignite.internal.storage.DataStorageManager; |
| import org.apache.ignite.internal.storage.DataStorageModule; |
| import org.apache.ignite.internal.storage.DataStorageModules; |
| import org.apache.ignite.internal.storage.configurations.StoragesConfiguration; |
| import org.apache.ignite.internal.systemview.SystemViewManagerImpl; |
| import org.apache.ignite.internal.systemview.api.SystemViewManager; |
| import org.apache.ignite.internal.table.distributed.TableManager; |
| import org.apache.ignite.internal.table.distributed.TableMessageGroup; |
| import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; |
| import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest; |
| import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries; |
| import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; |
| import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; |
| import org.apache.ignite.internal.thread.NamedThreadFactory; |
| import org.apache.ignite.internal.tx.HybridTimestampTracker; |
| import org.apache.ignite.internal.tx.LockManager; |
| import org.apache.ignite.internal.tx.TxManager; |
| import org.apache.ignite.internal.tx.impl.HeapLockManager; |
| import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; |
| import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; |
| import org.apache.ignite.internal.tx.impl.TxManagerImpl; |
| import org.apache.ignite.internal.tx.message.TxMessageGroup; |
| import org.apache.ignite.internal.vault.VaultManager; |
| import org.apache.ignite.internal.vault.VaultService; |
| import org.apache.ignite.internal.vault.persistence.PersistentVaultService; |
| import org.apache.ignite.lang.IgniteException; |
| import org.apache.ignite.network.ClusterNode; |
| import org.apache.ignite.network.ClusterService; |
| import org.apache.ignite.network.DefaultMessagingService; |
| import org.apache.ignite.network.MessageSerializationRegistryImpl; |
| import org.apache.ignite.network.NettyBootstrapFactory; |
| import org.apache.ignite.network.NetworkAddress; |
| import org.apache.ignite.network.NetworkMessage; |
| import org.apache.ignite.network.NodeMetadata; |
| import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory; |
| import org.apache.ignite.network.serialization.MessageSerializationRegistry; |
| import org.apache.ignite.network.serialization.SerializationRegistryServiceLoader; |
| import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; |
| import org.apache.ignite.sql.IgniteSql; |
| import org.apache.ignite.table.manager.IgniteTables; |
| import org.apache.ignite.tx.IgniteTransactions; |
| import org.jetbrains.annotations.Nullable; |
| import org.jetbrains.annotations.TestOnly; |
| |
| /** |
| * Ignite internal implementation. |
| */ |
| public class IgniteImpl implements Ignite { |
| /** The logger. */ |
| private static final IgniteLogger LOG = Loggers.forClass(IgniteImpl.class); |
| |
| /** |
| * Path to the persistent storage used by the {@link VaultService} component. |
| */ |
| private static final Path VAULT_DB_PATH = Paths.get("vault"); |
| |
| /** |
| * Path to the persistent storage used by the {@link MetaStorageManager} component. |
| */ |
| private static final Path METASTORAGE_DB_PATH = Paths.get("metastorage"); |
| |
| /** |
| * Path to the persistent storage used by the {@link ClusterManagementGroupManager} component. |
| */ |
| private static final Path CMG_DB_PATH = Paths.get("cmg"); |
| |
| /** |
| * Path for the partitions persistent storage. |
| */ |
| private static final Path PARTITIONS_STORE_PATH = Paths.get("db"); |
| |
| /** Ignite node name. */ |
| private final String name; |
| |
| /** Lifecycle manager. */ |
| private final LifecycleManager lifecycleManager; |
| |
| /** Vault manager. */ |
| private final VaultManager vaultMgr; |
| |
| /** Sql query engine. */ |
| private final SqlQueryProcessor qryEngine; |
| |
| /** Sql API facade. */ |
| private final IgniteSqlImpl sql; |
| |
| /** Configuration manager that handles node (local) configuration. */ |
| private final ConfigurationManager nodeCfgMgr; |
| |
| /** Cluster service (cluster network manager). */ |
| private final ClusterService clusterSvc; |
| |
| private final ComputeComponent computeComponent; |
| |
| /** Netty bootstrap factory. */ |
| private final NettyBootstrapFactory nettyBootstrapFactory; |
| |
| /** Raft manager. */ |
| private final Loza raftMgr; |
| |
| /** Meta storage manager. */ |
| private final MetaStorageManagerImpl metaStorageMgr; |
| |
| /** Placement driver manager. */ |
| private final PlacementDriverManager placementDriverMgr; |
| |
| /** Configuration manager that handles cluster (distributed) configuration. */ |
| private final ConfigurationManager clusterCfgMgr; |
| |
| /** Cluster initializer. */ |
| private final ClusterInitializer clusterInitializer; |
| |
| /** Replica manager. */ |
| private final ReplicaManager replicaMgr; |
| |
| /** Transactions manager. */ |
| private final TxManager txManager; |
| |
| /** Distributed table manager. */ |
| private final TableManager distributedTblMgr; |
| |
| private final IndexManager indexManager; |
| |
| /** Rest module. */ |
| private final RestComponent restComponent; |
| |
| private final ClusterStateStorage clusterStateStorage; |
| |
| private final ClusterManagementGroupManager cmgMgr; |
| |
| private final LogicalTopologyService logicalTopologyService; |
| |
| /** Client handler module. */ |
| private final ClientHandlerModule clientHandlerModule; |
| |
| /** Distributed configuration storage. */ |
| private final ConfigurationStorage cfgStorage; |
| |
| /** Compute. */ |
| private final IgniteCompute compute; |
| |
| /** JVM pause detector. */ |
| private final LongJvmPauseDetector longJvmPauseDetector; |
| |
| /** Data storage manager. */ |
| private final DataStorageManager dataStorageMgr; |
| |
| /** Schema manager. */ |
| private final SchemaManager schemaManager; |
| |
| /** Metric manager. */ |
| private final MetricManager metricManager; |
| |
| private final IgniteDeployment deploymentManager; |
| |
| private final DistributionZoneManager distributionZoneManager; |
| |
| /** Creator for volatile {@link org.apache.ignite.internal.raft.storage.LogStorageFactory} instances. */ |
| private final VolatileLogStorageFactoryCreator volatileLogStorageFactoryCreator; |
| |
| /** A hybrid logical clock. */ |
| private final HybridClock clock; |
| |
| private final ClockWaiter clockWaiter; |
| |
| private final OutgoingSnapshotsManager outgoingSnapshotsManager; |
| |
| private final RestAddressReporter restAddressReporter; |
| |
| private final CatalogManager catalogManager; |
| |
| private final AuthenticationManager authenticationManager; |
| |
| /** Timestamp tracker for embedded transactions. */ |
| private final HybridTimestampTracker observableTimestampTracker = new HybridTimestampTracker(); |
| |
| /** System views manager. */ |
| private final SystemViewManagerImpl systemViewManager; |
| |
| /** Index building manager. */ |
| private final IndexBuildingManager indexBuildingManager; |
| |
| /** |
| * The Constructor. |
| * |
| * @param name Ignite node name. |
| * @param configPath Path to node configuration in the HOCON format. |
| * @param workDir Work directory for the started node. Must not be {@code null}. |
| * @param serviceProviderClassLoader The class loader to be used to load provider-configuration files and provider classes, or |
| * {@code null} if the system class loader (or, failing that the bootstrap class loader) is to be used. |
| */ |
| IgniteImpl(String name, Path configPath, Path workDir, @Nullable ClassLoader serviceProviderClassLoader) { |
| this.name = name; |
| |
| longJvmPauseDetector = new LongJvmPauseDetector(name); |
| |
| lifecycleManager = new LifecycleManager(name); |
| |
| vaultMgr = createVault(name, workDir); |
| |
| metricManager = new MetricManager(); |
| |
| ConfigurationModules modules = loadConfigurationModules(serviceProviderClassLoader); |
| |
| ConfigurationTreeGenerator localConfigurationGenerator = new ConfigurationTreeGenerator( |
| modules.local().rootKeys(), |
| modules.local().schemaExtensions(), |
| modules.local().polymorphicSchemaExtensions() |
| ); |
| |
| LocalFileConfigurationStorage localFileConfigurationStorage = new LocalFileConfigurationStorage( |
| configPath, |
| localConfigurationGenerator |
| ); |
| |
| ConfigurationValidator localConfigurationValidator = |
| ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators()); |
| |
| nodeCfgMgr = new ConfigurationManager( |
| modules.local().rootKeys(), |
| localFileConfigurationStorage, |
| localConfigurationGenerator, |
| localConfigurationValidator |
| ); |
| |
| ConfigurationRegistry nodeConfigRegistry = nodeCfgMgr.configurationRegistry(); |
| |
| NetworkConfiguration networkConfiguration = nodeConfigRegistry.getConfiguration(NetworkConfiguration.KEY); |
| |
| MessageSerializationRegistry serializationRegistry = createSerializationRegistry(serviceProviderClassLoader); |
| |
| nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name); |
| |
| clusterSvc = new ScaleCubeClusterServiceFactory().createClusterService( |
| name, |
| networkConfiguration, |
| nettyBootstrapFactory, |
| serializationRegistry, |
| new VaultStateIds(vaultMgr) |
| ); |
| |
| clock = new HybridClockImpl(); |
| |
| clockWaiter = new ClockWaiter(name, clock); |
| |
| RaftConfiguration raftConfiguration = nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY); |
| |
| // TODO https://issues.apache.org/jira/browse/IGNITE-19051 |
| RaftGroupEventsClientListener raftGroupEventsClientListener = new RaftGroupEventsClientListener(); |
| |
| raftMgr = new Loza( |
| clusterSvc, |
| raftConfiguration, |
| workDir, |
| clock, |
| raftGroupEventsClientListener |
| ); |
| |
| LockManager lockMgr = new HeapLockManager(); |
| |
| ReplicaService replicaSvc = new ReplicaService(clusterSvc.messagingService(), clock); |
| |
| // TODO: IGNITE-16841 - use common RocksDB instance to store cluster state as well. |
| clusterStateStorage = new RocksDbClusterStateStorage(workDir.resolve(CMG_DB_PATH)); |
| |
| var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); |
| |
| ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator( |
| modules.distributed().rootKeys(), |
| modules.distributed().schemaExtensions(), |
| modules.distributed().polymorphicSchemaExtensions() |
| ); |
| |
| ConfigurationValidator distributedCfgValidator = ConfigurationValidatorImpl.withDefaultValidators( |
| distributedConfigurationGenerator, |
| modules.distributed().validators() |
| ); |
| |
| ConfigurationDynamicDefaultsPatcher clusterCfgDynamicDefaultsPatcher = new ConfigurationDynamicDefaultsPatcherImpl( |
| modules.distributed(), |
| distributedConfigurationGenerator |
| ); |
| |
| clusterInitializer = new ClusterInitializer( |
| clusterSvc, |
| clusterCfgDynamicDefaultsPatcher, |
| distributedCfgValidator |
| ); |
| |
| NodeAttributesCollector nodeAttributesCollector = |
| new NodeAttributesCollector( |
| nodeConfigRegistry.getConfiguration(NodeAttributesConfiguration.KEY), |
| nodeConfigRegistry.getConfiguration(StoragesConfiguration.KEY) |
| ); |
| |
| cmgMgr = new ClusterManagementGroupManager( |
| vaultMgr, |
| clusterSvc, |
| clusterInitializer, |
| raftMgr, |
| clusterStateStorage, |
| logicalTopology, |
| nodeConfigRegistry.getConfiguration(ClusterManagementConfiguration.KEY), |
| nodeAttributesCollector |
| ); |
| |
| logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgMgr); |
| |
| var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( |
| clusterSvc, |
| logicalTopologyService, |
| Loza.FACTORY, |
| raftGroupEventsClientListener |
| ); |
| |
| metaStorageMgr = new MetaStorageManagerImpl( |
| vaultMgr, |
| clusterSvc, |
| cmgMgr, |
| logicalTopologyService, |
| raftMgr, |
| new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH)), |
| clock, |
| topologyAwareRaftGroupServiceFactory |
| ); |
| |
| this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr); |
| |
| clusterCfgMgr = new ConfigurationManager( |
| modules.distributed().rootKeys(), |
| cfgStorage, |
| distributedConfigurationGenerator, |
| distributedCfgValidator |
| ); |
| |
| ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry(); |
| |
| metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageConfiguration.KEY)); |
| |
| Consumer<LongFunction<CompletableFuture<?>>> registry = c -> metaStorageMgr.registerRevisionUpdateListener(c::apply); |
| |
| placementDriverMgr = new PlacementDriverManager( |
| name, |
| metaStorageMgr, |
| MetastorageGroupId.INSTANCE, |
| clusterSvc, |
| cmgMgr::metaStorageNodes, |
| logicalTopologyService, |
| raftMgr, |
| topologyAwareRaftGroupServiceFactory, |
| clock |
| ); |
| |
| ReplicationConfiguration replicationConfig = clusterConfigRegistry.getConfiguration(ReplicationConfiguration.KEY); |
| |
| LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig); |
| |
| replicaMgr = new ReplicaManager( |
| name, |
| clusterSvc, |
| cmgMgr, |
| clock, |
| Set.of(TableMessageGroup.class, TxMessageGroup.class), |
| placementDriverMgr.placementDriver(), |
| partitionIdleSafeTimePropagationPeriodMsSupplier |
| ); |
| |
| metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY)); |
| |
| restAddressReporter = new RestAddressReporter(workDir); |
| |
| |
| DataStorageModules dataStorageModules = new DataStorageModules( |
| ServiceLoader.load(DataStorageModule.class, serviceProviderClassLoader) |
| ); |
| |
| Path storagePath = getPartitionsStorePath(workDir); |
| |
| GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY); |
| |
| dataStorageMgr = new DataStorageManager( |
| dataStorageModules.createStorageEngines( |
| name, |
| nodeConfigRegistry, |
| storagePath, |
| longJvmPauseDetector |
| ) |
| ); |
| |
| volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout")); |
| |
| outgoingSnapshotsManager = new OutgoingSnapshotsManager(clusterSvc.messagingService()); |
| |
| SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration( |
| SchemaSynchronizationConfiguration.KEY |
| ); |
| |
| LongSupplier delayDurationMsSupplier = delayDurationMsSupplier(schemaSyncConfig); |
| |
| CatalogManagerImpl catalogManager = new CatalogManagerImpl( |
| new UpdateLogImpl(metaStorageMgr), |
| clockWaiter, |
| delayDurationMsSupplier, |
| partitionIdleSafeTimePropagationPeriodMsSupplier |
| ); |
| |
| systemViewManager = new SystemViewManagerImpl(name, catalogManager); |
| nodeAttributesCollector.register(systemViewManager); |
| logicalTopology.addEventListener(systemViewManager); |
| systemViewManager.register(catalogManager); |
| |
| this.catalogManager = catalogManager; |
| |
| raftMgr.appendEntriesRequestInterceptor(new CheckCatalogVersionOnAppendEntries(catalogManager)); |
| raftMgr.actionRequestInterceptor(new CheckCatalogVersionOnActionRequest(catalogManager)); |
| |
| SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier); |
| |
| schemaManager = new SchemaManager(registry, catalogManager, metaStorageMgr); |
| |
| distributionZoneManager = new DistributionZoneManager( |
| name, |
| registry, |
| metaStorageMgr, |
| logicalTopologyService, |
| vaultMgr, |
| catalogManager |
| ); |
| |
| // TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently). |
| txManager = new TxManagerImpl( |
| clusterSvc, |
| replicaSvc, |
| lockMgr, |
| clock, |
| new TransactionIdGenerator(() -> clusterSvc.nodeName().hashCode()), |
| placementDriverMgr.placementDriver(), |
| partitionIdleSafeTimePropagationPeriodMsSupplier |
| ); |
| |
| distributedTblMgr = new TableManager( |
| name, |
| registry, |
| gcConfig, |
| clusterSvc, |
| raftMgr, |
| replicaMgr, |
| lockMgr, |
| replicaSvc, |
| txManager, |
| dataStorageMgr, |
| storagePath, |
| metaStorageMgr, |
| schemaManager, |
| volatileLogStorageFactoryCreator, |
| clock, |
| outgoingSnapshotsManager, |
| topologyAwareRaftGroupServiceFactory, |
| vaultMgr, |
| distributionZoneManager, |
| schemaSyncService, |
| catalogManager, |
| observableTimestampTracker, |
| placementDriverMgr.placementDriver() |
| ); |
| |
| indexManager = new IndexManager(schemaManager, distributedTblMgr, catalogManager, metaStorageMgr, registry); |
| |
| indexBuildingManager = new IndexBuildingManager( |
| name, |
| replicaSvc, |
| catalogManager, |
| metaStorageMgr, |
| indexManager, |
| placementDriverMgr.placementDriver(), |
| clusterSvc, |
| clock |
| ); |
| |
| qryEngine = new SqlQueryProcessor( |
| registry, |
| clusterSvc, |
| logicalTopologyService, |
| distributedTblMgr, |
| schemaManager, |
| dataStorageMgr, |
| () -> dataStorageModules.collectSchemasFields(modules.local().polymorphicSchemaExtensions()), |
| replicaSvc, |
| clock, |
| schemaSyncService, |
| catalogManager, |
| metricManager, |
| systemViewManager, |
| placementDriverMgr.placementDriver() |
| ); |
| |
| sql = new IgniteSqlImpl(name, qryEngine, new IgniteTransactionsImpl(txManager, observableTimestampTracker)); |
| |
| var deploymentManagerImpl = new DeploymentManagerImpl( |
| clusterSvc, |
| new DeploymentUnitStoreImpl(metaStorageMgr), |
| logicalTopologyService, |
| workDir, |
| nodeConfigRegistry.getConfiguration(DeploymentConfiguration.KEY), |
| cmgMgr, |
| name |
| ); |
| deploymentManager = deploymentManagerImpl; |
| |
| computeComponent = new ComputeComponentImpl( |
| clusterSvc.messagingService(), |
| new JobContextManager(deploymentManagerImpl, deploymentManagerImpl.deploymentUnitAccessor(), new JobClassLoaderFactory()), |
| new ComputeExecutorImpl(this, nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY)) |
| ); |
| |
| compute = new IgniteComputeImpl(clusterSvc.topologyService(), distributedTblMgr, computeComponent); |
| |
| authenticationManager = createAuthenticationManager(); |
| |
| clientHandlerModule = new ClientHandlerModule( |
| qryEngine, |
| distributedTblMgr, |
| //TODO: IGNITE-20232 The observable timestamp should be different for each client. |
| new IgniteTransactionsImpl(txManager, new HybridTimestampTracker()), |
| nodeConfigRegistry, |
| compute, |
| clusterSvc, |
| nettyBootstrapFactory, |
| sql, |
| () -> cmgMgr.clusterState().thenApply(s -> s.clusterTag().clusterId()), |
| metricManager, |
| new ClientHandlerMetricSource(), |
| authenticationManager, |
| clock, |
| schemaSyncService, |
| catalogManager, |
| placementDriverMgr.placementDriver() |
| ); |
| |
| restComponent = createRestComponent(name); |
| } |
| |
| private static SameValueLongSupplier delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) { |
| return new SameValueLongSupplier(() -> schemaSyncConfig.delayDuration().value()); |
| } |
| |
| private static LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier(ReplicationConfiguration replicationConfig) { |
| return new SameValueLongSupplier(() -> replicationConfig.idleSafeTimePropagationDuration().value()); |
| } |
| |
| private AuthenticationManager createAuthenticationManager() { |
| SecurityConfiguration securityConfiguration = clusterCfgMgr.configurationRegistry() |
| .getConfiguration(SecurityConfiguration.KEY); |
| |
| AuthenticationManager manager = new AuthenticationManagerImpl(); |
| securityConfiguration.listen(manager); |
| return manager; |
| } |
| |
| private RestComponent createRestComponent(String name) { |
| Supplier<RestFactory> presentationsFactory = () -> new PresentationsFactory(nodeCfgMgr, clusterCfgMgr); |
| Supplier<RestFactory> clusterManagementRestFactory = () -> new ClusterManagementRestFactory(clusterSvc, clusterInitializer, cmgMgr); |
| Supplier<RestFactory> nodeManagementRestFactory = () -> new NodeManagementRestFactory(lifecycleManager, () -> name, |
| new JdbcPortProviderImpl(nodeCfgMgr.configurationRegistry())); |
| Supplier<RestFactory> nodeMetricRestFactory = () -> new MetricRestFactory(metricManager); |
| Supplier<RestFactory> authProviderFactory = () -> new AuthenticationProviderFactory(authenticationManager); |
| Supplier<RestFactory> deploymentCodeRestFactory = () -> new CodeDeploymentRestFactory(deploymentManager); |
| RestConfiguration restConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(RestConfiguration.KEY); |
| return new RestComponent( |
| List.of(presentationsFactory, |
| clusterManagementRestFactory, |
| nodeManagementRestFactory, |
| nodeMetricRestFactory, |
| deploymentCodeRestFactory, |
| authProviderFactory), |
| restConfiguration |
| ); |
| } |
| |
| private static MessageSerializationRegistry createSerializationRegistry(@Nullable ClassLoader classLoader) { |
| var serviceLoader = new SerializationRegistryServiceLoader(classLoader); |
| |
| var serializationRegistry = new MessageSerializationRegistryImpl(); |
| |
| serviceLoader.registerSerializationFactories(serializationRegistry); |
| |
| return serializationRegistry; |
| } |
| |
| private static ConfigurationModules loadConfigurationModules(@Nullable ClassLoader classLoader) { |
| var modulesProvider = new ServiceLoaderModulesProvider(); |
| List<ConfigurationModule> modules = modulesProvider.modules(classLoader); |
| |
| if (modules.isEmpty()) { |
| throw new IllegalStateException("No configuration modules were loaded, this means Ignite cannot start. " |
| + "Please make sure that the classloader for loading services is correct."); |
| } |
| |
| var configModules = new ConfigurationModules(modules); |
| |
| LOG.info("Configuration modules loaded [modules={}, localRoots={}, distRoots={}]", |
| modules, configModules.local().rootKeys(), configModules.distributed().rootKeys()); |
| |
| return configModules; |
| } |
| |
| /** |
| * Starts ignite node. |
| * |
| * <p>When this method returns, the node is partially started and ready to accept the init command (that is, its |
| * REST endpoint is functional). |
| * |
| * @param configPath Node configuration based on |
| * {@link NetworkConfigurationSchema}. Following rules are used for applying the |
| * configuration properties: |
| * |
| * <ol> |
| * <li>Specified property overrides existing one or just applies itself if it wasn't |
| * previously specified.</li> |
| * <li>All non-specified properties either use previous value or use default one from |
| * corresponding configuration schema.</li> |
| * </ol> |
| * |
| * So that, in case of initial node start (first start ever) specified configuration, supplemented with defaults, is |
| * used. If no configuration was provided defaults are used for all configuration properties. In case of node |
| * restart, specified properties override existing ones, non specified properties that also weren't specified |
| * previously use default values. Please pay attention that previously specified properties are searched in the |
| * {@code workDir} specified by the user. |
| */ |
| public CompletableFuture<Ignite> start(Path configPath) { |
| ExecutorService startupExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.create(name, "start", LOG)); |
| |
| try { |
| metricManager.registerSource(new JvmMetricSource()); |
| |
| lifecycleManager.startComponent(longJvmPauseDetector); |
| |
| lifecycleManager.startComponent(vaultMgr); |
| |
| vaultMgr.putName(name).get(); |
| |
| // Node configuration manager startup. |
| lifecycleManager.startComponent(nodeCfgMgr); |
| |
| // Start the components that are required to join the cluster. |
| lifecycleManager.startComponents( |
| clockWaiter, |
| nettyBootstrapFactory, |
| clusterSvc, |
| restComponent, |
| raftMgr, |
| clusterStateStorage, |
| cmgMgr |
| ); |
| |
| clusterSvc.updateMetadata(new NodeMetadata(restComponent.hostName(), restComponent.httpPort(), restComponent.httpsPort())); |
| |
| restAddressReporter.writeReport(restHttpAddress(), restHttpsAddress()); |
| |
| LOG.info("Components started, joining the cluster"); |
| |
| return cmgMgr.joinFuture() |
| .thenComposeAsync(unused -> { |
| LOG.info("Join complete, starting MetaStorage"); |
| |
| try { |
| lifecycleManager.startComponent(metaStorageMgr); |
| } catch (NodeStoppingException e) { |
| throw new CompletionException(e); |
| } |
| |
| return metaStorageMgr.recoveryFinishedFuture(); |
| }, startupExecutor) |
| .thenComposeAsync(unused -> initializeClusterConfiguration(startupExecutor), startupExecutor) |
| .thenRunAsync(() -> { |
| LOG.info("MetaStorage started, starting the remaining components"); |
| |
| // Start all other components after the join request has completed and the node has been validated. |
| try { |
| lifecycleManager.startComponents( |
| catalogManager, |
| clusterCfgMgr, |
| placementDriverMgr, |
| metricManager, |
| distributionZoneManager, |
| computeComponent, |
| replicaMgr, |
| txManager, |
| dataStorageMgr, |
| schemaManager, |
| volatileLogStorageFactoryCreator, |
| outgoingSnapshotsManager, |
| distributedTblMgr, |
| indexManager, |
| indexBuildingManager, |
| qryEngine, |
| clientHandlerModule, |
| deploymentManager, |
| sql |
| ); |
| |
| // The system view manager comes last because other components |
| // must register system views before it starts. |
| lifecycleManager.startComponent(systemViewManager); |
| } catch (NodeStoppingException e) { |
| throw new CompletionException(e); |
| } |
| }, startupExecutor) |
| .thenComposeAsync(v -> { |
| LOG.info("Components started, performing recovery"); |
| |
| return recoverComponentsStateOnStart(startupExecutor); |
| }, startupExecutor) |
| .thenComposeAsync(v -> clusterCfgMgr.configurationRegistry().onDefaultsPersisted(), startupExecutor) |
| // Signal that local recovery is complete and the node is ready to join the cluster. |
| .thenComposeAsync(v -> { |
| LOG.info("Recovery complete, finishing join"); |
| |
| return cmgMgr.onJoinReady(); |
| }, startupExecutor) |
| .thenRunAsync(() -> { |
| try { |
| // Transfer the node to the STARTED state. |
| lifecycleManager.onStartComplete(); |
| } catch (NodeStoppingException e) { |
| throw new CompletionException(e); |
| } |
| }, startupExecutor) |
| .handleAsync((v, e) -> { |
| if (e != null) { |
| throw handleStartException(e); |
| } |
| |
| return (Ignite) this; |
| }, startupExecutor) |
| // Moving to the common pool on purpose to close the startup pool and proceed user's code in the common pool. |
| .whenCompleteAsync((res, ex) -> { |
| startupExecutor.shutdownNow(); |
| }); |
| } catch (Throwable e) { |
| startupExecutor.shutdownNow(); |
| |
| throw handleStartException(e); |
| } |
| } |
| |
| private RuntimeException handleStartException(Throwable e) { |
| String errMsg = "Unable to start [node=" + name + "]"; |
| |
| LOG.debug(errMsg, e); |
| |
| lifecycleManager.stopNode(); |
| |
| return new IgniteException(errMsg, e); |
| } |
| |
| /** |
| * Stops ignite node. |
| */ |
| public void stop() { |
| lifecycleManager.stopNode(); |
| restAddressReporter.removeReport(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteTables tables() { |
| return distributedTblMgr; |
| } |
| |
| @TestOnly |
| public QueryProcessor queryEngine() { |
| return qryEngine; |
| } |
| |
| @TestOnly |
| public SystemViewManager systemViewManager() { |
| return systemViewManager; |
| } |
| |
| @TestOnly |
| public MetaStorageManager metaStorageManager() { |
| return metaStorageMgr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteTransactions transactions() { |
| return new IgniteTransactionsImpl(txManager, observableTimestampTracker); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteSql sql() { |
| return sql; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void close() { |
| IgnitionManager.stop(name); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public String name() { |
| return name; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteCompute compute() { |
| return compute; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public Collection<ClusterNode> clusterNodes() { |
| return clusterSvc.topologyService().allMembers(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() { |
| return completedFuture(clusterNodes()); |
| } |
| |
| /** |
| * Returns node configuration. |
| */ |
| public ConfigurationRegistry nodeConfiguration() { |
| return nodeCfgMgr.configurationRegistry(); |
| } |
| |
| /** |
| * Returns cluster configuration. |
| */ |
| public ConfigurationRegistry clusterConfiguration() { |
| return clusterCfgMgr.configurationRegistry(); |
| } |
| |
| /** |
| * Returns the id of the current node. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| public String id() { |
| return clusterSvc.topologyService().localMember().id(); |
| } |
| |
| /** |
| * Returns the local HTTP address of REST endpoints. |
| * |
| * @return address or null if HTTP is not enabled. |
| * @throws IgniteInternalException if the REST module is not started. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| @Nullable |
| public NetworkAddress restHttpAddress() { |
| String host = restComponent.hostName(); |
| int port = restComponent.httpPort(); |
| if (port != -1) { |
| return new NetworkAddress(host, port); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns the local HTTPS address of REST endpoints. |
| * |
| * @return address or null if HTTPS is not enabled. |
| * @throws IgniteInternalException if the REST module is not started. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| public NetworkAddress restHttpsAddress() { |
| String host = restComponent.hostName(); |
| int port = restComponent.httpsPort(); |
| if (port != -1) { |
| return new NetworkAddress(host, port); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns the local address of the Thin Client. |
| * |
| * @throws IgniteInternalException if the Client module is not started. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| public NetworkAddress clientAddress() { |
| return NetworkAddress.from(clientHandlerModule.localAddress()); |
| } |
| |
| /** |
| * Initializes the cluster that this node is present in. |
| * |
| * @param metaStorageNodeNames names of nodes that will host the Meta Storage. |
| * @param cmgNodeNames names of nodes that will host the CMG. |
| * @param clusterName Human-readable name of a cluster. |
| * @param clusterConfiguration cluster configuration, that will be applied after init. |
| * @throws NodeStoppingException If node stopping intention was detected. |
| */ |
| public void init( |
| Collection<String> metaStorageNodeNames, |
| Collection<String> cmgNodeNames, |
| String clusterName, |
| String clusterConfiguration |
| ) throws NodeStoppingException { |
| cmgMgr.initCluster(metaStorageNodeNames, cmgNodeNames, clusterName, clusterConfiguration); |
| } |
| |
| /** |
| * Checks if the local revision is {@code 0} and initializes the cluster configuration with the specified user-provided configuration |
| * upon cluster initialization. If the local revision is not {@code 0}, does nothing. |
| */ |
| private CompletableFuture<Void> initializeClusterConfiguration(ExecutorService startupExecutor) { |
| return cfgStorage.localRevision() |
| .thenComposeAsync(appliedRevision -> { |
| if (appliedRevision != 0) { |
| return completedFuture(null); |
| } else { |
| return cmgMgr.initialClusterConfigurationFuture() |
| .thenAcceptAsync(initialConfigHocon -> { |
| if (initialConfigHocon == null) { |
| return; |
| } |
| |
| Config config = ConfigFactory.parseString(initialConfigHocon); |
| ConfigurationSource hoconSource = HoconConverter.hoconSource(config.root()); |
| clusterCfgMgr.configurationRegistry().initializeConfigurationWith(hoconSource); |
| }, startupExecutor); |
| } |
| }, startupExecutor); |
| } |
| |
| /** |
| * Recovers components state on start by invoking configuration listeners ({@link #notifyConfigurationListeners()} and deploying watches |
| * after that. |
| */ |
| private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService startupExecutor) { |
| CompletableFuture<Void> startupConfigurationUpdate = notifyConfigurationListeners(); |
| CompletableFuture<Void> startupRevisionUpdate = metaStorageMgr.notifyRevisionUpdateListenerOnStart(); |
| |
| return CompletableFuture.allOf(startupConfigurationUpdate, startupRevisionUpdate) |
| .thenComposeAsync(t -> { |
| // Deploy all registered watches because all components are ready and have registered their listeners. |
| return metaStorageMgr.deployWatches(); |
| }, startupExecutor); |
| } |
| |
| /** |
| * Notify all listeners of current configurations. |
| */ |
| private CompletableFuture<Void> notifyConfigurationListeners() { |
| return CompletableFuture.allOf( |
| nodeConfiguration().notifyCurrentConfigurationListeners(), |
| clusterConfiguration().notifyCurrentConfigurationListeners() |
| ); |
| } |
| |
| /** |
| * Starts the Vault component. |
| */ |
| private static VaultManager createVault(String nodeName, Path workDir) { |
| Path vaultPath = workDir.resolve(VAULT_DB_PATH); |
| |
| try { |
| Files.createDirectories(vaultPath); |
| } catch (IOException e) { |
| throw new IgniteInternalException(e); |
| } |
| |
| return new VaultManager(new PersistentVaultService(nodeName, vaultPath)); |
| } |
| |
| /** |
| * Returns a path to the partitions store directory. Creates a directory if it doesn't exist. |
| * |
| * @param workDir Ignite work directory. |
| * @return Partitions store path. |
| */ |
| private static Path getPartitionsStorePath(Path workDir) { |
| Path partitionsStore = workDir.resolve(PARTITIONS_STORE_PATH); |
| |
| try { |
| Files.createDirectories(partitionsStore); |
| } catch (IOException e) { |
| throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e); |
| } |
| |
| return partitionsStore; |
| } |
| |
| @TestOnly |
| public Loza raftManager() { |
| return raftMgr; |
| } |
| |
| @TestOnly |
| public ClusterNode node() { |
| return clusterSvc.topologyService().localMember(); |
| } |
| |
| @TestOnly |
| public DistributionZoneManager distributionZoneManager() { |
| return distributionZoneManager; |
| } |
| |
| @TestOnly |
| public LogicalTopologyService logicalTopologyService() { |
| return logicalTopologyService; |
| } |
| |
| @TestOnly |
| public IgniteDeployment deployment() { |
| return deploymentManager; |
| } |
| |
| // TODO: IGNITE-18493 - remove/move this |
| @TestOnly |
| public void dropMessages(BiPredicate<String, NetworkMessage> predicate) { |
| ((DefaultMessagingService) clusterSvc.messagingService()).dropMessages(predicate); |
| } |
| |
| // TODO: IGNITE-18493 - remove/move this |
| @TestOnly |
| @Nullable |
| public BiPredicate<String, NetworkMessage> dropMessagesPredicate() { |
| return ((DefaultMessagingService) clusterSvc.messagingService()).dropMessagesPredicate(); |
| } |
| |
| // TODO: IGNITE-18493 - remove/move this |
| @TestOnly |
| public void stopDroppingMessages() { |
| ((DefaultMessagingService) clusterSvc.messagingService()).stopDroppingMessages(); |
| } |
| |
| /** Returns the node's hybrid clock. */ |
| @TestOnly |
| public HybridClock clock() { |
| return clock; |
| } |
| |
| /** Returns the node's transaction manager. */ |
| @TestOnly |
| public TxManager txManager() { |
| return txManager; |
| } |
| |
| /** Returns the node's placement driver service. */ |
| @TestOnly |
| public PlacementDriver placementDriver() { |
| return placementDriverMgr.placementDriver(); |
| } |
| |
| /** Returns the node's catalog manager. */ |
| @TestOnly |
| public CatalogManager catalogManager() { |
| return catalogManager; |
| } |
| } |