| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.app; |
| |
| import static java.util.concurrent.CompletableFuture.completedFuture; |
| import static java.util.stream.Collectors.toSet; |
| import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; |
| import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; |
| import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; |
| import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; |
| |
| import com.typesafe.config.Config; |
| import com.typesafe.config.ConfigFactory; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.function.BiPredicate; |
| import java.util.function.Consumer; |
| import java.util.function.LongFunction; |
| import java.util.function.LongSupplier; |
| import java.util.function.Supplier; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgnitionManager; |
| import org.apache.ignite.catalog.IgniteCatalog; |
| import org.apache.ignite.catalog.Options; |
| import org.apache.ignite.client.handler.ClientHandlerMetricSource; |
| import org.apache.ignite.client.handler.ClientHandlerModule; |
| import org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration; |
| import org.apache.ignite.compute.IgniteCompute; |
| import org.apache.ignite.configuration.ConfigurationDynamicDefaultsPatcher; |
| import org.apache.ignite.configuration.ConfigurationModule; |
| import org.apache.ignite.internal.catalog.CatalogManager; |
| import org.apache.ignite.internal.catalog.CatalogManagerImpl; |
| import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration; |
| import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl; |
| import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; |
| import org.apache.ignite.internal.cluster.management.ClusterInitializer; |
| import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; |
| import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; |
| import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; |
| import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; |
| import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage; |
| import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage; |
| import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; |
| import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; |
| import org.apache.ignite.internal.component.RestAddressReporter; |
| import org.apache.ignite.internal.components.LogSyncer; |
| import org.apache.ignite.internal.components.LongJvmPauseDetector; |
| import org.apache.ignite.internal.compute.AntiHijackIgniteCompute; |
| import org.apache.ignite.internal.compute.ComputeComponent; |
| import org.apache.ignite.internal.compute.ComputeComponentImpl; |
| import org.apache.ignite.internal.compute.IgniteComputeImpl; |
| import org.apache.ignite.internal.compute.IgniteComputeInternal; |
| import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; |
| import org.apache.ignite.internal.compute.executor.ComputeExecutorImpl; |
| import org.apache.ignite.internal.compute.loader.JobClassLoaderFactory; |
| import org.apache.ignite.internal.compute.loader.JobContextManager; |
| import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine; |
| import org.apache.ignite.internal.configuration.ConfigurationDynamicDefaultsPatcherImpl; |
| import org.apache.ignite.internal.configuration.ConfigurationManager; |
| import org.apache.ignite.internal.configuration.ConfigurationModules; |
| import org.apache.ignite.internal.configuration.ConfigurationRegistry; |
| import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; |
| import org.apache.ignite.internal.configuration.JdbcPortProviderImpl; |
| import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider; |
| import org.apache.ignite.internal.configuration.hocon.HoconConverter; |
| import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; |
| import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage; |
| import org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage; |
| import org.apache.ignite.internal.configuration.tree.ConfigurationSource; |
| import org.apache.ignite.internal.configuration.validation.ConfigurationValidator; |
| import org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl; |
| import org.apache.ignite.internal.deployunit.DeploymentManagerImpl; |
| import org.apache.ignite.internal.deployunit.IgniteDeployment; |
| import org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration; |
| import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl; |
| import org.apache.ignite.internal.distributionzones.DistributionZoneManager; |
| import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration; |
| import org.apache.ignite.internal.eventlog.impl.EventLogImpl; |
| import org.apache.ignite.internal.failure.FailureProcessor; |
| import org.apache.ignite.internal.hlc.ClockService; |
| import org.apache.ignite.internal.hlc.ClockServiceImpl; |
| import org.apache.ignite.internal.hlc.ClockWaiter; |
| import org.apache.ignite.internal.hlc.HybridClock; |
| import org.apache.ignite.internal.hlc.HybridClockImpl; |
| import org.apache.ignite.internal.index.IndexBuildingManager; |
| import org.apache.ignite.internal.index.IndexManager; |
| import org.apache.ignite.internal.index.IndexNodeFinishedRwTransactionsChecker; |
| import org.apache.ignite.internal.lang.IgniteInternalException; |
| import org.apache.ignite.internal.lang.NodeStoppingException; |
| import org.apache.ignite.internal.logger.IgniteLogger; |
| import org.apache.ignite.internal.logger.Loggers; |
| import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl; |
| import org.apache.ignite.internal.metastorage.MetaStorageManager; |
| import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; |
| import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; |
| import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; |
| import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; |
| import org.apache.ignite.internal.metrics.MetricManager; |
| import org.apache.ignite.internal.metrics.MetricManagerImpl; |
| import org.apache.ignite.internal.metrics.configuration.MetricConfiguration; |
| import org.apache.ignite.internal.metrics.sources.JvmMetricSource; |
| import org.apache.ignite.internal.network.ChannelType; |
| import org.apache.ignite.internal.network.ClusterService; |
| import org.apache.ignite.internal.network.DefaultMessagingService; |
| import org.apache.ignite.internal.network.MessageSerializationRegistryImpl; |
| import org.apache.ignite.internal.network.MessagingService; |
| import org.apache.ignite.internal.network.NettyBootstrapFactory; |
| import org.apache.ignite.internal.network.NettyWorkersRegistrar; |
| import org.apache.ignite.internal.network.NetworkMessage; |
| import org.apache.ignite.internal.network.configuration.NetworkConfiguration; |
| import org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema; |
| import org.apache.ignite.internal.network.recovery.VaultStaleIds; |
| import org.apache.ignite.internal.network.scalecube.ScaleCubeClusterServiceFactory; |
| import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; |
| import org.apache.ignite.internal.network.serialization.SerializationRegistryServiceLoader; |
| import org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAfterSend; |
| import org.apache.ignite.internal.placementdriver.PlacementDriver; |
| import org.apache.ignite.internal.placementdriver.PlacementDriverManager; |
| import org.apache.ignite.internal.raft.Loza; |
| import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; |
| import org.apache.ignite.internal.raft.configuration.RaftConfiguration; |
| import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator; |
| import org.apache.ignite.internal.replicator.ReplicaManager; |
| import org.apache.ignite.internal.replicator.ReplicaService; |
| import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; |
| import org.apache.ignite.internal.rest.RestComponent; |
| import org.apache.ignite.internal.rest.RestFactory; |
| import org.apache.ignite.internal.rest.RestManager; |
| import org.apache.ignite.internal.rest.RestManagerFactory; |
| import org.apache.ignite.internal.rest.authentication.AuthenticationProviderFactory; |
| import org.apache.ignite.internal.rest.cluster.ClusterManagementRestFactory; |
| import org.apache.ignite.internal.rest.compute.ComputeRestFactory; |
| import org.apache.ignite.internal.rest.configuration.PresentationsFactory; |
| import org.apache.ignite.internal.rest.configuration.RestConfiguration; |
| import org.apache.ignite.internal.rest.deployment.CodeDeploymentRestFactory; |
| import org.apache.ignite.internal.rest.metrics.MetricRestFactory; |
| import org.apache.ignite.internal.rest.node.NodeManagementRestFactory; |
| import org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory; |
| import org.apache.ignite.internal.schema.SchemaManager; |
| import org.apache.ignite.internal.schema.configuration.GcConfiguration; |
| import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration; |
| import org.apache.ignite.internal.security.authentication.AuthenticationManager; |
| import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl; |
| import org.apache.ignite.internal.security.configuration.SecurityConfiguration; |
| import org.apache.ignite.internal.sql.api.IgniteSqlImpl; |
| import org.apache.ignite.internal.sql.api.PublicApiThreadingIgniteSql; |
| import org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedConfiguration; |
| import org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration; |
| import org.apache.ignite.internal.sql.engine.QueryProcessor; |
| import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; |
| import org.apache.ignite.internal.storage.DataStorageManager; |
| import org.apache.ignite.internal.storage.DataStorageModule; |
| import org.apache.ignite.internal.storage.DataStorageModules; |
| import org.apache.ignite.internal.storage.configurations.StorageConfiguration; |
| import org.apache.ignite.internal.storage.engine.StorageEngine; |
| import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine; |
| import org.apache.ignite.internal.systemview.SystemViewManagerImpl; |
| import org.apache.ignite.internal.systemview.api.SystemViewManager; |
| import org.apache.ignite.internal.table.distributed.PublicApiThreadingIgniteTables; |
| import org.apache.ignite.internal.table.distributed.TableManager; |
| import org.apache.ignite.internal.table.distributed.TableMessageGroup; |
| import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager; |
| import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; |
| import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest; |
| import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries; |
| import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; |
| import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; |
| import org.apache.ignite.internal.thread.IgniteThreadFactory; |
| import org.apache.ignite.internal.thread.NamedThreadFactory; |
| import org.apache.ignite.internal.tx.HybridTimestampTracker; |
| import org.apache.ignite.internal.tx.LockManager; |
| import org.apache.ignite.internal.tx.TxManager; |
| import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; |
| import org.apache.ignite.internal.tx.impl.HeapLockManager; |
| import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; |
| import org.apache.ignite.internal.tx.impl.PublicApiThreadingIgniteTransactions; |
| import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; |
| import org.apache.ignite.internal.tx.impl.ResourceVacuumManager; |
| import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; |
| import org.apache.ignite.internal.tx.impl.TransactionInflights; |
| import org.apache.ignite.internal.tx.impl.TxManagerImpl; |
| import org.apache.ignite.internal.tx.message.TxMessageGroup; |
| import org.apache.ignite.internal.vault.VaultManager; |
| import org.apache.ignite.internal.vault.VaultService; |
| import org.apache.ignite.internal.vault.persistence.PersistentVaultService; |
| import org.apache.ignite.internal.worker.CriticalWorkerWatchdog; |
| import org.apache.ignite.internal.worker.ThreadAssertions; |
| import org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration; |
| import org.apache.ignite.lang.IgniteException; |
| import org.apache.ignite.network.ClusterNode; |
| import org.apache.ignite.network.NetworkAddress; |
| import org.apache.ignite.network.NodeMetadata; |
| import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; |
| import org.apache.ignite.sql.IgniteSql; |
| import org.apache.ignite.table.manager.IgniteTables; |
| import org.apache.ignite.tx.IgniteTransactions; |
| import org.jetbrains.annotations.Nullable; |
| import org.jetbrains.annotations.TestOnly; |
| |
| /** |
| * Ignite internal implementation. |
| */ |
| public class IgniteImpl implements Ignite { |
| /** The logger. */ |
| private static final IgniteLogger LOG = Loggers.forClass(IgniteImpl.class); |
| |
| /** |
| * Path to the persistent storage used by the {@link VaultService} component. |
| */ |
| private static final Path VAULT_DB_PATH = Paths.get("vault"); |
| |
| /** |
| * Path to the persistent storage used by the {@link MetaStorageManager} component. |
| */ |
| private static final Path METASTORAGE_DB_PATH = Paths.get("metastorage"); |
| |
| /** |
| * Path to the persistent storage used by the {@link ClusterManagementGroupManager} component. |
| */ |
| private static final Path CMG_DB_PATH = Paths.get("cmg"); |
| |
| /** |
| * Path for the partitions persistent storage. |
| */ |
| private static final Path PARTITIONS_STORE_PATH = Paths.get("db"); |
| |
| /** Ignite node name. */ |
| private final String name; |
| |
| /** Lifecycle manager. */ |
| private final LifecycleManager lifecycleManager; |
| |
| private final ThreadPoolsManager threadPoolsManager; |
| |
| /** Vault manager. */ |
| private final VaultManager vaultMgr; |
| |
| /** Sql query engine. */ |
| private final SqlQueryProcessor qryEngine; |
| |
| /** Sql API facade. */ |
| private final IgniteSqlImpl sql; |
| |
| /** Configuration manager that handles node (local) configuration. */ |
| private final ConfigurationManager nodeCfgMgr; |
| |
| /** Cluster service (cluster network manager). */ |
| private final ClusterService clusterSvc; |
| |
| private final ComputeComponent computeComponent; |
| |
| private final CriticalWorkerWatchdog criticalWorkerRegistry; |
| |
| /** Failure processor. */ |
| private final FailureProcessor failureProcessor; |
| |
| /** Netty bootstrap factory. */ |
| private final NettyBootstrapFactory nettyBootstrapFactory; |
| |
| private final NettyWorkersRegistrar nettyWorkersRegistrar; |
| |
| /** Raft manager. */ |
| private final Loza raftMgr; |
| |
| /** Meta storage manager. */ |
| private final MetaStorageManagerImpl metaStorageMgr; |
| |
| /** Placement driver manager. */ |
| private final PlacementDriverManager placementDriverMgr; |
| |
| /** Configuration manager that handles cluster (distributed) configuration. */ |
| private final ConfigurationManager clusterCfgMgr; |
| |
| /** Cluster initializer. */ |
| private final ClusterInitializer clusterInitializer; |
| |
| /** Replica manager. */ |
| private final ReplicaManager replicaMgr; |
| |
| /** Transactions manager. */ |
| private final TxManager txManager; |
| |
| /** Distributed table manager. */ |
| private final TableManager distributedTblMgr; |
| |
| /** Disaster recovery manager. */ |
| private final DisasterRecoveryManager disasterRecoveryManager; |
| |
| private final IndexManager indexManager; |
| |
| /** Rest module. */ |
| private final RestComponent restComponent; |
| |
| private final ClusterStateStorage clusterStateStorage; |
| |
| private final ClusterManagementGroupManager cmgMgr; |
| |
| private final LogicalTopologyService logicalTopologyService; |
| |
| /** Client handler module. */ |
| private final ClientHandlerModule clientHandlerModule; |
| |
| /** Distributed configuration storage. */ |
| private final ConfigurationStorage cfgStorage; |
| |
| /** Compute. */ |
| private final IgniteComputeInternal compute; |
| |
| /** JVM pause detector. */ |
| private final LongJvmPauseDetector longJvmPauseDetector; |
| |
| /** Data storage manager. */ |
| private final DataStorageManager dataStorageMgr; |
| |
| /** Schema manager. */ |
| private final SchemaManager schemaManager; |
| |
| /** Metric manager. */ |
| private final MetricManager metricManager; |
| |
| private final IgniteDeployment deploymentManager; |
| |
| private final DistributionZoneManager distributionZoneManager; |
| |
| /** Creator for volatile {@link org.apache.ignite.internal.raft.storage.LogStorageFactory} instances. */ |
| private final VolatileLogStorageFactoryCreator volatileLogStorageFactoryCreator; |
| |
| /** A hybrid logical clock. */ |
| private final HybridClock clock; |
| |
| private final ClockWaiter clockWaiter; |
| |
| private final ClockService clockService; |
| |
| private final LowWatermarkImpl lowWatermark; |
| |
| private final OutgoingSnapshotsManager outgoingSnapshotsManager; |
| |
| private final RestAddressReporter restAddressReporter; |
| |
| private final CatalogManager catalogManager; |
| |
| private final AuthenticationManager authenticationManager; |
| |
| /** Timestamp tracker for embedded transactions. */ |
| private final HybridTimestampTracker observableTimestampTracker = new HybridTimestampTracker(); |
| |
| /** System views manager. */ |
| private final SystemViewManagerImpl systemViewManager; |
| |
| /** Index building manager. */ |
| private final IndexBuildingManager indexBuildingManager; |
| |
| /** Local node RW transaction completion checker for indexes. */ |
| private final IndexNodeFinishedRwTransactionsChecker indexNodeFinishedRwTransactionsChecker; |
| |
| /** Cleanup manager for tx resources. */ |
| private final ResourceVacuumManager resourceVacuumManager; |
| |
| /** Remote triggered resources registry. */ |
| private final RemotelyTriggeredResourceRegistry resourcesRegistry; |
| |
| private final Executor asyncContinuationExecutor = ForkJoinPool.commonPool(); |
| |
| /** |
| * The Constructor. |
| * |
| * @param name Ignite node name. |
| * @param configPath Path to node configuration in the HOCON format. |
| * @param workDir Work directory for the started node. Must not be {@code null}. |
| * @param serviceProviderClassLoader The class loader to be used to load provider-configuration files and provider classes, or |
| * {@code null} if the system class loader (or, failing that the bootstrap class loader) is to be used. |
| */ |
| IgniteImpl(String name, Path configPath, Path workDir, @Nullable ClassLoader serviceProviderClassLoader) { |
| this.name = name; |
| |
| longJvmPauseDetector = new LongJvmPauseDetector(name); |
| |
| lifecycleManager = new LifecycleManager(name); |
| |
| threadPoolsManager = new ThreadPoolsManager(name); |
| |
| vaultMgr = createVault(workDir); |
| |
| metricManager = new MetricManagerImpl(); |
| |
| ConfigurationModules modules = loadConfigurationModules(serviceProviderClassLoader); |
| |
| ConfigurationTreeGenerator localConfigurationGenerator = new ConfigurationTreeGenerator( |
| modules.local().rootKeys(), |
| modules.local().schemaExtensions(), |
| modules.local().polymorphicSchemaExtensions() |
| ); |
| |
| LocalFileConfigurationStorage localFileConfigurationStorage = new LocalFileConfigurationStorage( |
| name, |
| configPath, |
| localConfigurationGenerator, |
| modules.local() |
| ); |
| |
| ConfigurationValidator localConfigurationValidator = |
| ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators()); |
| |
| nodeCfgMgr = new ConfigurationManager( |
| modules.local().rootKeys(), |
| localFileConfigurationStorage, |
| localConfigurationGenerator, |
| localConfigurationValidator |
| ); |
| |
| ConfigurationRegistry nodeConfigRegistry = nodeCfgMgr.configurationRegistry(); |
| |
| NetworkConfiguration networkConfiguration = nodeConfigRegistry.getConfiguration(NetworkConfiguration.KEY); |
| |
| MessageSerializationRegistry serializationRegistry = createSerializationRegistry(serviceProviderClassLoader); |
| |
| // TODO https://issues.apache.org/jira/browse/IGNITE-20450 |
| failureProcessor = new FailureProcessor(name); |
| |
| CriticalWorkersConfiguration criticalWorkersConfiguration = nodeConfigRegistry.getConfiguration(CriticalWorkersConfiguration.KEY); |
| |
| criticalWorkerRegistry = new CriticalWorkerWatchdog( |
| criticalWorkersConfiguration, |
| threadPoolsManager.commonScheduler(), |
| failureProcessor |
| ); |
| |
| nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name); |
| nettyWorkersRegistrar = new NettyWorkersRegistrar( |
| criticalWorkerRegistry, |
| threadPoolsManager.commonScheduler(), |
| nettyBootstrapFactory, |
| criticalWorkersConfiguration, |
| failureProcessor |
| ); |
| |
| clusterSvc = new ScaleCubeClusterServiceFactory().createClusterService( |
| name, |
| networkConfiguration, |
| nettyBootstrapFactory, |
| serializationRegistry, |
| new VaultStaleIds(vaultMgr), |
| criticalWorkerRegistry, |
| failureProcessor |
| ); |
| |
| clock = new HybridClockImpl(); |
| |
| clockWaiter = new ClockWaiter(name, clock); |
| |
| RaftConfiguration raftConfiguration = nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY); |
| |
| // TODO https://issues.apache.org/jira/browse/IGNITE-19051 |
| RaftGroupEventsClientListener raftGroupEventsClientListener = new RaftGroupEventsClientListener(); |
| |
| raftMgr = new Loza( |
| clusterSvc, |
| metricManager, |
| raftConfiguration, |
| workDir, |
| clock, |
| raftGroupEventsClientListener |
| ); |
| |
| LockManager lockMgr = new HeapLockManager(); |
| |
| MessagingService messagingServiceReturningToStorageOperationsPool = new JumpToExecutorByConsistentIdAfterSend( |
| clusterSvc.messagingService(), |
| name, |
| message -> threadPoolsManager.partitionOperationsExecutor() |
| ); |
| |
| // TODO: IGNITE-16841 - use common RocksDB instance to store cluster state as well. |
| clusterStateStorage = new RocksDbClusterStateStorage(workDir.resolve(CMG_DB_PATH), name); |
| |
| var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); |
| |
| ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator( |
| modules.distributed().rootKeys(), |
| modules.distributed().schemaExtensions(), |
| modules.distributed().polymorphicSchemaExtensions() |
| ); |
| |
| ConfigurationValidator distributedCfgValidator = ConfigurationValidatorImpl.withDefaultValidators( |
| distributedConfigurationGenerator, |
| modules.distributed().validators() |
| ); |
| |
| ConfigurationDynamicDefaultsPatcher clusterCfgDynamicDefaultsPatcher = new ConfigurationDynamicDefaultsPatcherImpl( |
| modules.distributed(), |
| distributedConfigurationGenerator |
| ); |
| |
| clusterInitializer = new ClusterInitializer( |
| clusterSvc, |
| clusterCfgDynamicDefaultsPatcher, |
| distributedCfgValidator |
| ); |
| |
| NodeAttributesCollector nodeAttributesCollector = |
| new NodeAttributesCollector( |
| nodeConfigRegistry.getConfiguration(NodeAttributesConfiguration.KEY), |
| nodeConfigRegistry.getConfiguration(StorageConfiguration.KEY) |
| ); |
| |
| cmgMgr = new ClusterManagementGroupManager( |
| vaultMgr, |
| clusterSvc, |
| clusterInitializer, |
| raftMgr, |
| clusterStateStorage, |
| logicalTopology, |
| nodeConfigRegistry.getConfiguration(ClusterManagementConfiguration.KEY), |
| nodeAttributesCollector |
| ); |
| |
| logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgMgr); |
| |
| var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( |
| clusterSvc, |
| logicalTopologyService, |
| Loza.FACTORY, |
| raftGroupEventsClientListener |
| ); |
| |
| metaStorageMgr = new MetaStorageManagerImpl( |
| clusterSvc, |
| cmgMgr, |
| logicalTopologyService, |
| raftMgr, |
| new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH), failureProcessor), |
| clock, |
| topologyAwareRaftGroupServiceFactory |
| ); |
| |
| this.cfgStorage = new DistributedConfigurationStorage(name, metaStorageMgr); |
| |
| clusterCfgMgr = new ConfigurationManager( |
| modules.distributed().rootKeys(), |
| cfgStorage, |
| distributedConfigurationGenerator, |
| distributedCfgValidator |
| ); |
| |
| ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry(); |
| |
| metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageConfiguration.KEY)); |
| |
| SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration( |
| SchemaSynchronizationConfiguration.KEY |
| ); |
| |
| clockService = new ClockServiceImpl(clock, clockWaiter, new SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value())); |
| |
| Consumer<LongFunction<CompletableFuture<?>>> registry = c -> metaStorageMgr.registerRevisionUpdateListener(c::apply); |
| |
| placementDriverMgr = new PlacementDriverManager( |
| name, |
| metaStorageMgr, |
| MetastorageGroupId.INSTANCE, |
| clusterSvc, |
| cmgMgr::metaStorageNodes, |
| logicalTopologyService, |
| raftMgr, |
| topologyAwareRaftGroupServiceFactory, |
| clockService |
| ); |
| |
| ReplicationConfiguration replicationConfig = clusterConfigRegistry.getConfiguration(ReplicationConfiguration.KEY); |
| |
| ReplicaService replicaSvc = new ReplicaService( |
| messagingServiceReturningToStorageOperationsPool, |
| clock, |
| threadPoolsManager.partitionOperationsExecutor(), |
| replicationConfig |
| ); |
| |
| LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig); |
| |
| replicaMgr = new ReplicaManager( |
| name, |
| clusterSvc, |
| cmgMgr, |
| clockService, |
| Set.of(TableMessageGroup.class, TxMessageGroup.class), |
| placementDriverMgr.placementDriver(), |
| threadPoolsManager.partitionOperationsExecutor(), |
| partitionIdleSafeTimePropagationPeriodMsSupplier, |
| failureProcessor |
| ); |
| |
| metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY)); |
| |
| restAddressReporter = new RestAddressReporter(workDir); |
| |
| DataStorageModules dataStorageModules = new DataStorageModules( |
| ServiceLoader.load(DataStorageModule.class, serviceProviderClassLoader) |
| ); |
| |
| Path storagePath = getPartitionsStorePath(workDir); |
| |
| GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY); |
| |
| LogSyncer logSyncer = raftMgr.getLogSyncer(); |
| |
| Map<String, StorageEngine> storageEngines = dataStorageModules.createStorageEngines( |
| name, |
| nodeConfigRegistry, |
| storagePath, |
| longJvmPauseDetector, |
| failureProcessor, |
| logSyncer |
| ); |
| |
| dataStorageMgr = new DataStorageManager( |
| applyThreadAssertionsIfNeeded(storageEngines), |
| nodeConfigRegistry.getConfiguration(StorageConfiguration.KEY) |
| ); |
| |
| volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(name, workDir.resolve("volatile-log-spillout")); |
| |
| outgoingSnapshotsManager = new OutgoingSnapshotsManager(name, clusterSvc.messagingService()); |
| |
| LongSupplier delayDurationMsSupplier = delayDurationMsSupplier(schemaSyncConfig); |
| |
| CatalogManagerImpl catalogManager = new CatalogManagerImpl( |
| new UpdateLogImpl(metaStorageMgr), |
| clockService, |
| delayDurationMsSupplier, |
| partitionIdleSafeTimePropagationPeriodMsSupplier |
| ); |
| |
| systemViewManager = new SystemViewManagerImpl(name, catalogManager); |
| nodeAttributesCollector.register(systemViewManager); |
| logicalTopology.addEventListener(systemViewManager); |
| systemViewManager.register(catalogManager); |
| |
| this.catalogManager = catalogManager; |
| |
| raftMgr.appendEntriesRequestInterceptor(new CheckCatalogVersionOnAppendEntries(catalogManager)); |
| raftMgr.actionRequestInterceptor(new CheckCatalogVersionOnActionRequest(catalogManager)); |
| |
| SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier); |
| |
| schemaManager = new SchemaManager(registry, catalogManager); |
| |
| ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, |
| NamedThreadFactory.create(name, "rebalance-scheduler", LOG)); |
| |
| distributionZoneManager = new DistributionZoneManager( |
| name, |
| registry, |
| metaStorageMgr, |
| logicalTopologyService, |
| catalogManager, |
| rebalanceScheduler |
| ); |
| |
| TransactionConfiguration txConfig = clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY); |
| |
| indexNodeFinishedRwTransactionsChecker = new IndexNodeFinishedRwTransactionsChecker( |
| catalogManager, |
| clusterSvc.messagingService(), |
| clock |
| ); |
| |
| resourcesRegistry = new RemotelyTriggeredResourceRegistry(); |
| |
| lowWatermark = new LowWatermarkImpl( |
| name, |
| gcConfig.lowWatermark(), |
| clockService, |
| vaultMgr, |
| failureProcessor, |
| clusterSvc.messagingService() |
| ); |
| |
| var transactionInflights = new TransactionInflights(placementDriverMgr.placementDriver(), clockService); |
| |
| // TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently). |
| txManager = new TxManagerImpl( |
| name, |
| txConfig, |
| messagingServiceReturningToStorageOperationsPool, |
| clusterSvc.topologyService(), |
| replicaSvc, |
| lockMgr, |
| clockService, |
| new TransactionIdGenerator(() -> clusterSvc.nodeName().hashCode()), |
| placementDriverMgr.placementDriver(), |
| partitionIdleSafeTimePropagationPeriodMsSupplier, |
| indexNodeFinishedRwTransactionsChecker, |
| threadPoolsManager.partitionOperationsExecutor(), |
| resourcesRegistry, |
| transactionInflights, |
| lowWatermark |
| ); |
| |
| resourceVacuumManager = new ResourceVacuumManager( |
| name, |
| resourcesRegistry, |
| clusterSvc.topologyService(), |
| messagingServiceReturningToStorageOperationsPool, |
| transactionInflights, |
| txManager |
| ); |
| |
| StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY); |
| |
| distributedTblMgr = new TableManager( |
| name, |
| registry, |
| gcConfig, |
| txConfig, |
| storageUpdateConfiguration, |
| messagingServiceReturningToStorageOperationsPool, |
| clusterSvc.topologyService(), |
| clusterSvc.serializationRegistry(), |
| raftMgr, |
| replicaMgr, |
| lockMgr, |
| replicaSvc, |
| txManager, |
| dataStorageMgr, |
| storagePath, |
| metaStorageMgr, |
| schemaManager, |
| volatileLogStorageFactoryCreator, |
| threadPoolsManager.tableIoExecutor(), |
| threadPoolsManager.partitionOperationsExecutor(), |
| clock, |
| clockService, |
| outgoingSnapshotsManager, |
| topologyAwareRaftGroupServiceFactory, |
| distributionZoneManager, |
| schemaSyncService, |
| catalogManager, |
| observableTimestampTracker, |
| placementDriverMgr.placementDriver(), |
| this::bareSql, |
| resourcesRegistry, |
| rebalanceScheduler, |
| lowWatermark, |
| transactionInflights |
| ); |
| |
| disasterRecoveryManager = new DisasterRecoveryManager( |
| threadPoolsManager.tableIoExecutor(), |
| messagingServiceReturningToStorageOperationsPool, |
| metaStorageMgr, |
| catalogManager, |
| distributionZoneManager, |
| raftMgr |
| ); |
| |
| indexManager = new IndexManager( |
| schemaManager, |
| distributedTblMgr, |
| catalogManager, |
| threadPoolsManager.tableIoExecutor(), |
| registry, |
| lowWatermark |
| ); |
| |
| indexBuildingManager = new IndexBuildingManager( |
| name, |
| replicaSvc, |
| catalogManager, |
| metaStorageMgr, |
| indexManager, |
| placementDriverMgr.placementDriver(), |
| clusterSvc, |
| logicalTopologyService, |
| clockService |
| ); |
| |
| qryEngine = new SqlQueryProcessor( |
| registry, |
| clusterSvc, |
| logicalTopologyService, |
| distributedTblMgr, |
| schemaManager, |
| dataStorageMgr, |
| replicaSvc, |
| clockService, |
| schemaSyncService, |
| catalogManager, |
| metricManager, |
| systemViewManager, |
| failureProcessor, |
| partitionIdleSafeTimePropagationPeriodMsSupplier, |
| placementDriverMgr.placementDriver(), |
| clusterConfigRegistry.getConfiguration(SqlDistributedConfiguration.KEY), |
| nodeConfigRegistry.getConfiguration(SqlLocalConfiguration.KEY), |
| transactionInflights |
| ); |
| |
| sql = new IgniteSqlImpl(qryEngine, new IgniteTransactionsImpl(txManager, observableTimestampTracker)); |
| |
| var deploymentManagerImpl = new DeploymentManagerImpl( |
| clusterSvc, |
| new DeploymentUnitStoreImpl(metaStorageMgr), |
| logicalTopologyService, |
| workDir, |
| nodeConfigRegistry.getConfiguration(DeploymentConfiguration.KEY), |
| cmgMgr, |
| name |
| ); |
| deploymentManager = deploymentManagerImpl; |
| |
| ComputeConfiguration computeCfg = nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY); |
| InMemoryComputeStateMachine stateMachine = new InMemoryComputeStateMachine(computeCfg, name); |
| computeComponent = new ComputeComponentImpl( |
| name, |
| clusterSvc.messagingService(), |
| clusterSvc.topologyService(), |
| logicalTopologyService, |
| new JobContextManager(deploymentManagerImpl, deploymentManagerImpl.deploymentUnitAccessor(), new JobClassLoaderFactory()), |
| new ComputeExecutorImpl(this, stateMachine, computeCfg), |
| computeCfg |
| ); |
| |
| compute = new IgniteComputeImpl( |
| placementDriverMgr.placementDriver(), |
| clusterSvc.topologyService(), |
| distributedTblMgr, |
| computeComponent, |
| clock |
| ); |
| |
| authenticationManager = createAuthenticationManager(); |
| |
| ClientConnectorConfiguration clientConnectorConfiguration = nodeConfigRegistry.getConfiguration(ClientConnectorConfiguration.KEY); |
| |
| clientHandlerModule = new ClientHandlerModule( |
| qryEngine, |
| distributedTblMgr, |
| // TODO: IGNITE-20232 The observable timestamp should be different for each client. |
| new IgniteTransactionsImpl(txManager, new HybridTimestampTracker()), |
| compute, |
| clusterSvc, |
| nettyBootstrapFactory, |
| () -> cmgMgr.clusterState().thenApply(s -> s.clusterTag()), |
| metricManager, |
| new ClientHandlerMetricSource(), |
| authenticationManager, |
| clockService, |
| schemaSyncService, |
| catalogManager, |
| placementDriverMgr.placementDriver(), |
| clientConnectorConfiguration, |
| lowWatermark |
| ); |
| |
| restComponent = createRestComponent(name); |
| } |
| |
| private static Map<String, StorageEngine> applyThreadAssertionsIfNeeded(Map<String, StorageEngine> storageEngines) { |
| boolean enabled = ThreadAssertions.enabled(); |
| |
| LOG.info("Thread assertions enablement status: {}", enabled); |
| |
| if (!enabled) { |
| return storageEngines; |
| } |
| |
| Map<String, StorageEngine> decoratedEngines = new HashMap<>(); |
| |
| for (Entry<String, StorageEngine> entry : storageEngines.entrySet()) { |
| decoratedEngines.put(entry.getKey(), new ThreadAssertingStorageEngine(entry.getValue())); |
| } |
| |
| return Map.copyOf(decoratedEngines); |
| } |
| |
| private static SameValueLongSupplier delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) { |
| return new SameValueLongSupplier(() -> schemaSyncConfig.delayDuration().value()); |
| } |
| |
| private static LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier(ReplicationConfiguration replicationConfig) { |
| return new SameValueLongSupplier(() -> replicationConfig.idleSafeTimePropagationDuration().value()); |
| } |
| |
| private AuthenticationManager createAuthenticationManager() { |
| SecurityConfiguration securityConfiguration = clusterCfgMgr.configurationRegistry() |
| .getConfiguration(SecurityConfiguration.KEY); |
| EventLogConfiguration eventLogConfiguration = clusterCfgMgr.configurationRegistry() |
| .getConfiguration(EventLogConfiguration.KEY); |
| |
| return new AuthenticationManagerImpl(securityConfiguration, new EventLogImpl(eventLogConfiguration)); |
| } |
| |
| private RestComponent createRestComponent(String name) { |
| RestManager restManager = new RestManager(); |
| Supplier<RestFactory> presentationsFactory = () -> new PresentationsFactory(nodeCfgMgr, clusterCfgMgr); |
| Supplier<RestFactory> clusterManagementRestFactory = () -> new ClusterManagementRestFactory(clusterSvc, clusterInitializer, cmgMgr); |
| Supplier<RestFactory> nodeManagementRestFactory = () -> new NodeManagementRestFactory(lifecycleManager, () -> name, |
| new JdbcPortProviderImpl(nodeCfgMgr.configurationRegistry())); |
| Supplier<RestFactory> nodeMetricRestFactory = () -> new MetricRestFactory(metricManager); |
| Supplier<RestFactory> authProviderFactory = () -> new AuthenticationProviderFactory(authenticationManager); |
| Supplier<RestFactory> deploymentCodeRestFactory = () -> new CodeDeploymentRestFactory(deploymentManager); |
| Supplier<RestFactory> restManagerFactory = () -> new RestManagerFactory(restManager); |
| Supplier<RestFactory> computeRestFactory = () -> new ComputeRestFactory(compute); |
| Supplier<RestFactory> disasterRecoveryFactory = () -> new DisasterRecoveryFactory(disasterRecoveryManager); |
| |
| RestConfiguration restConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(RestConfiguration.KEY); |
| |
| return new RestComponent( |
| List.of(presentationsFactory, |
| clusterManagementRestFactory, |
| nodeManagementRestFactory, |
| nodeMetricRestFactory, |
| deploymentCodeRestFactory, |
| authProviderFactory, |
| restManagerFactory, |
| computeRestFactory, |
| disasterRecoveryFactory |
| ), |
| restManager, |
| restConfiguration |
| ); |
| } |
| |
| private static MessageSerializationRegistry createSerializationRegistry(@Nullable ClassLoader classLoader) { |
| var serviceLoader = new SerializationRegistryServiceLoader(classLoader); |
| |
| var serializationRegistry = new MessageSerializationRegistryImpl(); |
| |
| serviceLoader.registerSerializationFactories(serializationRegistry); |
| |
| return serializationRegistry; |
| } |
| |
| private static ConfigurationModules loadConfigurationModules(@Nullable ClassLoader classLoader) { |
| var modulesProvider = new ServiceLoaderModulesProvider(); |
| List<ConfigurationModule> modules = modulesProvider.modules(classLoader); |
| |
| if (modules.isEmpty()) { |
| throw new IllegalStateException("No configuration modules were loaded, this means Ignite cannot start. " |
| + "Please make sure that the classloader for loading services is correct."); |
| } |
| |
| var configModules = new ConfigurationModules(modules); |
| |
| LOG.info("Configuration modules loaded [modules={}, localRoots={}, distRoots={}]", |
| modules, configModules.local().rootKeys(), configModules.distributed().rootKeys()); |
| |
| return configModules; |
| } |
| |
| /** |
| * Starts ignite node. |
| * |
| * <p>When this method returns, the node is partially started and ready to accept the init command (that is, its |
| * REST endpoint is functional). |
| * |
| * @param configPath Node configuration based on |
| * {@link NetworkConfigurationSchema}. Following rules are used for applying the |
| * configuration properties: |
| * |
| * <ol> |
| * <li>Specified property overrides existing one or just applies itself if it wasn't |
| * previously specified.</li> |
| * <li>All non-specified properties either use previous value or use default one from |
| * corresponding configuration schema.</li> |
| * </ol> |
| * |
| * So that, in case of initial node start (first start ever) specified configuration, supplemented with defaults, is |
| * used. If no configuration was provided defaults are used for all configuration properties. In case of node |
| * restart, specified properties override existing ones, non specified properties that also weren't specified |
| * previously use default values. Please pay attention that previously specified properties are searched in the |
| * {@code workDir} specified by the user. |
| */ |
| public CompletableFuture<Ignite> start(Path configPath) { |
| ExecutorService startupExecutor = Executors.newSingleThreadExecutor( |
| IgniteThreadFactory.create(name, "start", LOG, STORAGE_READ, STORAGE_WRITE) |
| ); |
| |
| try { |
| metricManager.registerSource(new JvmMetricSource()); |
| |
| lifecycleManager.startComponent(longJvmPauseDetector); |
| |
| lifecycleManager.startComponent(vaultMgr); |
| |
| vaultMgr.putName(name); |
| |
| // Node configuration manager startup. |
| lifecycleManager.startComponent(nodeCfgMgr); |
| |
| // Start the components that are required to join the cluster. |
| lifecycleManager.startComponents( |
| threadPoolsManager, |
| clockWaiter, |
| failureProcessor, |
| criticalWorkerRegistry, |
| nettyBootstrapFactory, |
| nettyWorkersRegistrar, |
| clusterSvc, |
| restComponent, |
| raftMgr, |
| clusterStateStorage, |
| cmgMgr, |
| lowWatermark |
| ); |
| |
| clusterSvc.updateMetadata(new NodeMetadata(restComponent.hostName(), restComponent.httpPort(), restComponent.httpsPort())); |
| |
| restAddressReporter.writeReport(restHttpAddress(), restHttpsAddress()); |
| |
| LOG.info("Components started, joining the cluster"); |
| |
| return cmgMgr.joinFuture() |
| // Disable REST component during initialization. |
| .thenAcceptAsync(unused -> restComponent.disable(), startupExecutor) |
| .thenComposeAsync(unused -> { |
| LOG.info("Join complete, starting MetaStorage"); |
| |
| try { |
| lifecycleManager.startComponent(metaStorageMgr); |
| } catch (NodeStoppingException e) { |
| throw new CompletionException(e); |
| } |
| |
| return metaStorageMgr.recoveryFinishedFuture(); |
| }, startupExecutor) |
| .thenComposeAsync(unused -> initializeClusterConfiguration(startupExecutor), startupExecutor) |
| .thenRunAsync(() -> { |
| LOG.info("MetaStorage started, starting the remaining components"); |
| |
| // Start all other components after the join request has completed and the node has been validated. |
| try { |
| lifecycleManager.startComponents( |
| catalogManager, |
| clusterCfgMgr, |
| authenticationManager, |
| placementDriverMgr, |
| metricManager, |
| distributionZoneManager, |
| computeComponent, |
| replicaMgr, |
| indexNodeFinishedRwTransactionsChecker, |
| txManager, |
| dataStorageMgr, |
| schemaManager, |
| volatileLogStorageFactoryCreator, |
| outgoingSnapshotsManager, |
| distributedTblMgr, |
| disasterRecoveryManager, |
| indexManager, |
| indexBuildingManager, |
| qryEngine, |
| clientHandlerModule, |
| deploymentManager, |
| sql, |
| resourceVacuumManager |
| ); |
| |
| // The system view manager comes last because other components |
| // must register system views before it starts. |
| lifecycleManager.startComponent(systemViewManager); |
| } catch (NodeStoppingException e) { |
| throw new CompletionException(e); |
| } |
| }, startupExecutor) |
| .thenComposeAsync(v -> { |
| LOG.info("Components started, performing recovery"); |
| |
| return recoverComponentsStateOnStart(startupExecutor, lifecycleManager.allComponentsStartFuture()); |
| }, startupExecutor) |
| .thenComposeAsync(v -> clusterCfgMgr.configurationRegistry().onDefaultsPersisted(), startupExecutor) |
| // Signal that local recovery is complete and the node is ready to join the cluster. |
| .thenComposeAsync(v -> { |
| LOG.info("Recovery complete, finishing join"); |
| |
| return cmgMgr.onJoinReady(); |
| }, startupExecutor) |
| .thenComposeAsync(ignored -> awaitSelfInLocalLogicalTopology(), startupExecutor) |
| .thenCompose(ignored -> catalogManager.catalogInitializationFuture()) |
| .thenRunAsync(() -> { |
| try { |
| // Enable watermark events. |
| lowWatermark.scheduleUpdates(); |
| |
| // Enable REST component on start complete. |
| restComponent.enable(); |
| // Transfer the node to the STARTED state. |
| lifecycleManager.onStartComplete(); |
| } catch (NodeStoppingException e) { |
| throw new CompletionException(e); |
| } |
| }, startupExecutor) |
| .handleAsync((v, e) -> { |
| if (e != null) { |
| throw handleStartException(e); |
| } |
| |
| return (Ignite) this; |
| }, startupExecutor) |
| // Moving to the common pool on purpose to close the startup pool and proceed user's code in the common pool. |
| .whenCompleteAsync((res, ex) -> { |
| startupExecutor.shutdownNow(); |
| }); |
| } catch (Throwable e) { |
| startupExecutor.shutdownNow(); |
| |
| throw handleStartException(e); |
| } |
| } |
| |
| private CompletableFuture<Void> awaitSelfInLocalLogicalTopology() { |
| CompletableFuture<Void> awaitSelfInLogicalTopologyFuture = new CompletableFuture<>(); |
| |
| LogicalTopologyEventListener awaitSelfListener = new LogicalTopologyEventListener() { |
| @Override |
| public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) { |
| checkSelfInTopology(newTopology, awaitSelfInLogicalTopologyFuture, this); |
| } |
| |
| @Override |
| public void onTopologyLeap(LogicalTopologySnapshot newTopology) { |
| checkSelfInTopology(newTopology, awaitSelfInLogicalTopologyFuture, this); |
| } |
| }; |
| |
| logicalTopologyService.addEventListener(awaitSelfListener); |
| |
| checkSelfInTopology(logicalTopologyService.localLogicalTopology(), awaitSelfInLogicalTopologyFuture, awaitSelfListener); |
| |
| return awaitSelfInLogicalTopologyFuture; |
| } |
| |
| private void checkSelfInTopology( |
| LogicalTopologySnapshot logicalTopologySnapshot, |
| CompletableFuture<Void> awaitSelfInLogicalTopologyFuture, |
| LogicalTopologyEventListener awaitSelfListener |
| ) { |
| if (logicalTopologySnapshot.nodes().stream().map(LogicalNode::id).collect(toSet()).contains(id())) { |
| awaitSelfInLogicalTopologyFuture.complete(null); |
| logicalTopologyService.removeEventListener(awaitSelfListener); |
| } |
| } |
| |
| private RuntimeException handleStartException(Throwable e) { |
| String errMsg = "Unable to start [node=" + name + "]"; |
| |
| LOG.debug(errMsg, e); |
| |
| IgniteException igniteException = new IgniteException(errMsg, e); |
| |
| try { |
| lifecycleManager.stopNode().get(); |
| } catch (Exception ex) { |
| igniteException.addSuppressed(ex); |
| } |
| |
| return igniteException; |
| } |
| |
| /** |
| * Synchronously stops ignite node. |
| */ |
| public void stop() throws ExecutionException, InterruptedException { |
| stopAsync().get(); |
| } |
| |
| /** |
| * Asynchronously stops ignite node. |
| */ |
| public CompletableFuture<Void> stopAsync() { |
| return lifecycleManager.stopNode() |
| .whenComplete((unused, throwable) -> restAddressReporter.removeReport()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteTables tables() { |
| return new PublicApiThreadingIgniteTables(distributedTblMgr, asyncContinuationExecutor); |
| } |
| |
| public DisasterRecoveryManager disasterRecoveryManager() { |
| return disasterRecoveryManager; |
| } |
| |
| @TestOnly |
| public QueryProcessor queryEngine() { |
| return qryEngine; |
| } |
| |
| @TestOnly |
| public SystemViewManager systemViewManager() { |
| return systemViewManager; |
| } |
| |
| @TestOnly |
| public MetaStorageManager metaStorageManager() { |
| return metaStorageMgr; |
| } |
| |
| @TestOnly |
| public FailureProcessor failureProcessor() { |
| return failureProcessor; |
| } |
| |
| @TestOnly |
| public MetricManager metricManager() { |
| return metricManager; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteTransactions transactions() { |
| IgniteTransactionsImpl transactions = new IgniteTransactionsImpl(txManager, observableTimestampTracker); |
| return new PublicApiThreadingIgniteTransactions(transactions, asyncContinuationExecutor); |
| } |
| |
| private IgniteSql bareSql() { |
| return sql; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteSql sql() { |
| return new PublicApiThreadingIgniteSql(sql, asyncContinuationExecutor); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void close() { |
| IgnitionManager.stop(name); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public String name() { |
| return name; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteCompute compute() { |
| return new AntiHijackIgniteCompute(compute, asyncContinuationExecutor); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public Collection<ClusterNode> clusterNodes() { |
| return clusterSvc.topologyService().allMembers(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public CompletableFuture<Collection<ClusterNode>> clusterNodesAsync() { |
| return completedFuture(clusterNodes()); |
| } |
| |
| @Override |
| public IgniteCatalog catalog(Options options) { |
| return new IgniteCatalogSqlImpl(sql, options); |
| } |
| |
| /** |
| * Returns node configuration. |
| */ |
| public ConfigurationRegistry nodeConfiguration() { |
| return nodeCfgMgr.configurationRegistry(); |
| } |
| |
| /** |
| * Returns cluster configuration. |
| */ |
| public ConfigurationRegistry clusterConfiguration() { |
| return clusterCfgMgr.configurationRegistry(); |
| } |
| |
| /** |
| * Returns the id of the current node. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| public String id() { |
| return clusterSvc.topologyService().localMember().id(); |
| } |
| |
| /** |
| * Returns the local HTTP address of REST endpoints. |
| * |
| * @return address or null if HTTP is not enabled. |
| * @throws IgniteInternalException if the REST module is not started. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| @Nullable |
| public NetworkAddress restHttpAddress() { |
| String host = restComponent.hostName(); |
| int port = restComponent.httpPort(); |
| if (port != -1) { |
| return new NetworkAddress(host, port); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns the local HTTPS address of REST endpoints. |
| * |
| * @return address or null if HTTPS is not enabled. |
| * @throws IgniteInternalException if the REST module is not started. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| @Nullable |
| public NetworkAddress restHttpsAddress() { |
| String host = restComponent.hostName(); |
| int port = restComponent.httpsPort(); |
| if (port != -1) { |
| return new NetworkAddress(host, port); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns the local address of the Thin Client. |
| * |
| * @throws IgniteInternalException if the Client module is not started. |
| */ |
| // TODO: should be encapsulated in local properties, see https://issues.apache.org/jira/browse/IGNITE-15131 |
| public NetworkAddress clientAddress() { |
| return NetworkAddress.from(clientHandlerModule.localAddress()); |
| } |
| |
| /** |
| * Initializes the cluster that this node is present in. |
| * |
| * @param metaStorageNodeNames names of nodes that will host the Meta Storage. |
| * @param cmgNodeNames names of nodes that will host the CMG. |
| * @param clusterName Human-readable name of a cluster. |
| * @param clusterConfiguration cluster configuration, that will be applied after init. |
| * @throws NodeStoppingException If node stopping intention was detected. |
| */ |
| public void init( |
| Collection<String> metaStorageNodeNames, |
| Collection<String> cmgNodeNames, |
| String clusterName, |
| String clusterConfiguration |
| ) throws NodeStoppingException { |
| cmgMgr.initCluster(metaStorageNodeNames, cmgNodeNames, clusterName, clusterConfiguration); |
| } |
| |
| /** |
| * Checks if the local revision is {@code 0} and initializes the cluster configuration with the specified user-provided configuration |
| * upon cluster initialization. If the local revision is not {@code 0}, does nothing. |
| */ |
| private CompletableFuture<Void> initializeClusterConfiguration(ExecutorService startupExecutor) { |
| return cfgStorage.localRevision() |
| .thenComposeAsync(appliedRevision -> { |
| if (appliedRevision != 0) { |
| return nullCompletedFuture(); |
| } else { |
| return cmgMgr.initialClusterConfigurationFuture() |
| .thenAcceptAsync(initialConfigHocon -> { |
| if (initialConfigHocon == null) { |
| return; |
| } |
| |
| Config config = ConfigFactory.parseString(initialConfigHocon); |
| ConfigurationSource hoconSource = HoconConverter.hoconSource(config.root()); |
| clusterCfgMgr.configurationRegistry().initializeConfigurationWith(hoconSource); |
| }, startupExecutor); |
| } |
| }, startupExecutor); |
| } |
| |
| /** |
| * Recovers components state on start by invoking configuration listeners ({@link #notifyConfigurationListeners()} and deploying watches |
| * after that. |
| */ |
| private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService startupExecutor, CompletableFuture<Void> startFuture) { |
| CompletableFuture<Void> startupConfigurationUpdate = notifyConfigurationListeners(); |
| CompletableFuture<Void> startupRevisionUpdate = metaStorageMgr.notifyRevisionUpdateListenerOnStart(); |
| |
| return CompletableFuture.allOf(startupConfigurationUpdate, startupRevisionUpdate, startFuture) |
| .thenComposeAsync(t -> { |
| // Deploy all registered watches because all components are ready and have registered their listeners. |
| return metaStorageMgr.deployWatches(); |
| }, startupExecutor); |
| } |
| |
| /** |
| * Notify all listeners of current configurations. |
| */ |
| private CompletableFuture<Void> notifyConfigurationListeners() { |
| return CompletableFuture.allOf( |
| nodeConfiguration().notifyCurrentConfigurationListeners(), |
| clusterConfiguration().notifyCurrentConfigurationListeners() |
| ); |
| } |
| |
| /** |
| * Starts the Vault component. |
| */ |
| private static VaultManager createVault(Path workDir) { |
| Path vaultPath = workDir.resolve(VAULT_DB_PATH); |
| |
| try { |
| Files.createDirectories(vaultPath); |
| } catch (IOException e) { |
| throw new IgniteInternalException(e); |
| } |
| |
| return new VaultManager(new PersistentVaultService(vaultPath)); |
| } |
| |
| /** |
| * Returns a path to the partitions store directory. Creates a directory if it doesn't exist. |
| * |
| * @param workDir Ignite work directory. |
| * @return Partitions store path. |
| */ |
| private static Path getPartitionsStorePath(Path workDir) { |
| Path partitionsStore = workDir.resolve(PARTITIONS_STORE_PATH); |
| |
| try { |
| Files.createDirectories(partitionsStore); |
| } catch (IOException e) { |
| throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e); |
| } |
| |
| return partitionsStore; |
| } |
| |
| @TestOnly |
| public Loza raftManager() { |
| return raftMgr; |
| } |
| |
| @TestOnly |
| public ClusterNode node() { |
| return clusterSvc.topologyService().localMember(); |
| } |
| |
| @TestOnly |
| public DistributionZoneManager distributionZoneManager() { |
| return distributionZoneManager; |
| } |
| |
| @TestOnly |
| public LogicalTopologyService logicalTopologyService() { |
| return logicalTopologyService; |
| } |
| |
| @TestOnly |
| public IgniteDeployment deployment() { |
| return deploymentManager; |
| } |
| |
| // TODO: IGNITE-18493 - remove/move this |
| @TestOnly |
| public void dropMessages(BiPredicate<String, NetworkMessage> predicate) { |
| ((DefaultMessagingService) clusterSvc.messagingService()).dropMessages(predicate); |
| } |
| |
| // TODO: IGNITE-18493 - remove/move this |
| @TestOnly |
| @Nullable |
| public BiPredicate<String, NetworkMessage> dropMessagesPredicate() { |
| return ((DefaultMessagingService) clusterSvc.messagingService()).dropMessagesPredicate(); |
| } |
| |
| // TODO IGNITE-18493 - remove/move this |
| @TestOnly |
| public void sendFakeMessage(String recipientConsistentId, NetworkMessage msg) { |
| clusterSvc.messagingService().send(recipientConsistentId, ChannelType.DEFAULT, msg); |
| } |
| |
| // TODO: IGNITE-18493 - remove/move this |
| @TestOnly |
| public void stopDroppingMessages() { |
| ((DefaultMessagingService) clusterSvc.messagingService()).stopDroppingMessages(); |
| } |
| |
| /** Returns the node's hybrid clock. */ |
| @TestOnly |
| public HybridClock clock() { |
| return clock; |
| } |
| |
| @TestOnly |
| public ClockService clockService() { |
| return clockService; |
| } |
| |
| /** Returns the node's transaction manager. */ |
| @TestOnly |
| public TxManager txManager() { |
| return txManager; |
| } |
| |
| /** Returns the node's placement driver service. */ |
| @TestOnly |
| public PlacementDriver placementDriver() { |
| return placementDriverMgr.placementDriver(); |
| } |
| |
| /** Returns the node's catalog manager. */ |
| @TestOnly |
| public CatalogManager catalogManager() { |
| return catalogManager; |
| } |
| |
| /** Returns the cluster's configuration manager. */ |
| @TestOnly |
| public ConfigurationRegistry clusterConfigurationRegistry() { |
| return clusterCfgMgr.configurationRegistry(); |
| } |
| |
| /** |
| * Returns {@link NettyBootstrapFactory}. |
| */ |
| @TestOnly |
| public NettyBootstrapFactory nettyBootstrapFactory() { |
| return nettyBootstrapFactory; |
| } |
| |
| /** Returns cluster service (cluster network manager). */ |
| @TestOnly |
| public ClusterService clusterService() { |
| return clusterSvc; |
| } |
| |
| /** Returns resources registry. */ |
| @TestOnly |
| public RemotelyTriggeredResourceRegistry resourcesRegistry() { |
| return resourcesRegistry; |
| } |
| |
| /** Returns low watermark. */ |
| @TestOnly |
| public LowWatermarkImpl lowWatermark() { |
| return lowWatermark; |
| } |
| } |