blob: 56e0cd0faca021f8b5f3eb31fd31cc347e713e20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.runner.app;
import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertionsAsync;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.app.ThreadPoolsManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import org.apache.ignite.internal.configuration.NodeConfigWriteException;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.ClockServiceImpl;
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NettyWorkersRegistrar;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.recovery.VaultStaleIds;
import org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAfterSend;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
import org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedConfiguration;
import org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.worker.CriticalWorkerWatchdog;
import org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.awaitility.Awaitility;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
/**
* These tests check node restart scenarios.
*/
@ExtendWith(ConfigurationExtension.class)
@Timeout(120)
public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
/** Value producer for table data, is used to create data and check it later. */
private static final IntFunction<String> VALUE_PRODUCER = i -> "val " + i;
/** Test table name. */
private static final String TABLE_NAME = "Table1";
/** Assume that the table id will always be 8 for the test table. There is an assertion to check if this is true. */
private static final int TABLE_ID = 8;
/** Test table name. */
private static final String TABLE_NAME_2 = "Table2";
@InjectConfiguration("mock: " + RAFT_CFG)
private static RaftConfiguration raftConfiguration;
@InjectConfiguration
private static ClusterManagementConfiguration clusterManagementConfiguration;
@InjectConfiguration
private static NodeAttributesConfiguration nodeAttributes;
@InjectConfiguration
private static StorageConfiguration storageConfiguration;
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
@InjectConfiguration
private static TransactionConfiguration txConfiguration;
@InjectConfiguration
private static StorageUpdateConfiguration storageUpdateConfiguration;
@InjectConfiguration
private CriticalWorkersConfiguration workersConfiguration;
@InjectConfiguration
private ReplicationConfiguration replicationConfiguration;
/**
* Interceptor of {@link MetaStorageManager#invoke(Condition, Collection, Collection)}.
*/
private final Map<Integer, InvokeInterceptor> metaStorageInvokeInterceptorByNode = new ConcurrentHashMap<>();
/**
* Mocks the data nodes returned by {@link DistributionZoneManager#dataNodes(long, int, int)} method on different nodes.
*/
private final Map<Integer, Supplier<CompletableFuture<Set<String>>>> dataNodesMockByNode = new ConcurrentHashMap<>();
@BeforeEach
public void beforeTest() {
metaStorageInvokeInterceptorByNode.clear();
dataNodesMockByNode.clear();
partialNodes.clear();
}
/**
* Start some of Ignite components that are able to serve as Ignite node for test purposes.
*
* @param idx Node index.
* @param cfgString Configuration string or {@code null} to use the default configuration.
* @return Partial node.
*/
private PartialNode startPartialNode(
int idx,
@Nullable @Language("HOCON") String cfgString
) {
String name = testNodeName(testInfo, idx);
Path dir = workDir.resolve(name);
List<IgniteComponent> components = new ArrayList<>();
VaultManager vault = createVault(dir);
ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader());
Path configFile = workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
String configString = cfgString == null ? configurationString(idx) : cfgString;
try {
Files.writeString(configFile, configString);
} catch (IOException e) {
throw new NodeConfigWriteException("Failed to write config content to file.", e);
}
var localConfigurationGenerator = new ConfigurationTreeGenerator(
modules.local().rootKeys(),
modules.local().schemaExtensions(),
modules.local().polymorphicSchemaExtensions()
);
var nodeCfgMgr = new ConfigurationManager(
modules.local().rootKeys(),
new LocalFileConfigurationStorage(configFile, localConfigurationGenerator, modules.local()),
localConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators())
);
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
var threadPoolsManager = new ThreadPoolsManager(name);
var failureProcessor = new FailureProcessor(name);
var workerRegistry = new CriticalWorkerWatchdog(workersConfiguration, threadPoolsManager.commonScheduler(), failureProcessor);
var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name);
var nettyWorkersRegistrar = new NettyWorkersRegistrar(
workerRegistry,
threadPoolsManager.commonScheduler(),
nettyBootstrapFactory,
workersConfiguration,
failureProcessor
);
var clusterSvc = new TestScaleCubeClusterServiceFactory().createClusterService(
name,
networkConfiguration,
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStaleIds(vault),
workerRegistry,
failureProcessor
);
var hybridClock = new HybridClockImpl();
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock, raftGroupEventsClientListener);
var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg"), name);
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterSvc,
hocon -> hocon,
new TestConfigurationValidator()
);
var cmgManager = new ClusterManagementGroupManager(
vault,
clusterSvc,
clusterInitializer,
raftMgr,
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes,
nodeCfgMgr.configurationRegistry().getConfiguration(StorageConfiguration.KEY))
);
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier
= () -> TestIgnitionManager.DEFAULT_PARTITION_IDLE_SYNC_TIME_INTERVAL_MS;
MessagingService messagingServiceReturningToStorageOperationsPool = new JumpToExecutorByConsistentIdAfterSend(
clusterSvc.messagingService(),
name,
message -> threadPoolsManager.partitionOperationsExecutor()
);
var replicaService = new ReplicaService(
messagingServiceReturningToStorageOperationsPool,
hybridClock,
threadPoolsManager.partitionOperationsExecutor(),
replicationConfiguration
);
var lockManager = new HeapLockManager();
var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterSvc,
logicalTopologyService,
Loza.FACTORY,
raftGroupEventsClientListener
);
var metaStorage = new RocksDbKeyValueStorage(name, dir.resolve("metastorage"), new NoOpFailureProcessor(name));
InvokeInterceptor metaStorageInvokeInterceptor = metaStorageInvokeInterceptorByNode.get(idx);
var metaStorageMgr = new MetaStorageManagerImpl(
clusterSvc,
cmgManager,
logicalTopologyService,
raftMgr,
metaStorage,
hybridClock,
topologyAwareRaftGroupServiceFactory,
metaStorageConfiguration
) {
@Override
public CompletableFuture<Boolean> invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
if (metaStorageInvokeInterceptor != null) {
var res = metaStorageInvokeInterceptor.invoke(condition, success, failure);
if (res != null) {
return completedFuture(res);
}
}
return super.invoke(condition, success, failure);
}
};
var cfgStorage = new DistributedConfigurationStorage("test", metaStorageMgr);
ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
modules.distributed().schemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
var clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
cfgStorage,
distributedConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator, modules.distributed().validators())
);
ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
var clockWaiter = new ClockWaiter(name, hybridClock);
SchemaSynchronizationConfiguration schemaSyncConfiguration = clusterConfigRegistry.getConfiguration(
SchemaSynchronizationConfiguration.KEY
);
ClockService clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
() -> schemaSyncConfiguration.maxClockSkew().value()
);
var placementDriverManager = new PlacementDriverManager(
name,
metaStorageMgr,
MetastorageGroupId.INSTANCE,
clusterSvc,
cmgManager::metaStorageNodes,
logicalTopologyService,
raftMgr,
topologyAwareRaftGroupServiceFactory,
clockService
);
ReplicaManager replicaMgr = new ReplicaManager(
name,
clusterSvc,
cmgManager,
clockService,
Set.of(TableMessageGroup.class, TxMessageGroup.class),
placementDriverManager.placementDriver(),
threadPoolsManager.partitionOperationsExecutor(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
failureProcessor
);
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
var lowWatermark = new LowWatermarkImpl(
name,
gcConfig.lowWatermark(),
clockService,
vault,
failureProcessor,
clusterSvc.messagingService()
);
TransactionInflights transactionInflights = new TransactionInflights(placementDriverManager.placementDriver(), clockService);
var txManager = new TxManagerImpl(
name,
txConfiguration,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
replicaService,
lockManager,
clockService,
new TransactionIdGenerator(idx),
placementDriverManager.placementDriver(),
partitionIdleSafeTimePropagationPeriodMsSupplier,
new TestLocalRwTxCounter(),
threadPoolsManager.partitionOperationsExecutor(),
resourcesRegistry,
transactionInflights,
lowWatermark
);
ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager(
name,
resourcesRegistry,
clusterSvc.topologyService(),
clusterSvc.messagingService(),
transactionInflights,
txManager
);
Consumer<LongFunction<CompletableFuture<?>>> registry = (c) -> metaStorageMgr.registerRevisionUpdateListener(c::apply);
DataStorageModules dataStorageModules = new DataStorageModules(
ServiceLoader.load(DataStorageModule.class)
);
Path storagePath = getPartitionsStorePath(dir);
DataStorageManager dataStorageManager = new DataStorageManager(
dataStorageModules.createStorageEngines(
name,
nodeCfgMgr.configurationRegistry(),
storagePath,
null,
failureProcessor,
raftMgr.getLogSyncer()
),
nodeCfgMgr.configurationRegistry().getConfiguration(StorageConfiguration.KEY)
);
TransactionConfiguration txConfiguration = clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
LongSupplier delayDurationMsSupplier = () -> TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
var catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageMgr),
clockService,
delayDurationMsSupplier,
partitionIdleSafeTimePropagationPeriodMsSupplier
);
SchemaManager schemaManager = new SchemaManager(registry, catalogManager);
var dataNodesMock = dataNodesMockByNode.get(idx);
ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(name, "test-rebalance-scheduler", logger()));
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
registry,
metaStorageMgr,
logicalTopologyService,
catalogManager,
rebalanceScheduler
) {
@Override
public CompletableFuture<Set<String>> dataNodes(long causalityToken, int catalogVersion, int zoneId) {
if (dataNodesMock != null) {
return dataNodesMock.get();
}
return super.dataNodes(causalityToken, catalogVersion, zoneId);
}
};
var schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
var sqlRef = new AtomicReference<IgniteSqlImpl>();
TableManager tableManager = new TableManager(
name,
registry,
gcConfig,
txConfiguration,
storageUpdateConfiguration,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
clusterSvc.serializationRegistry(),
raftMgr,
replicaMgr,
lockManager,
replicaService,
txManager,
dataStorageManager,
storagePath,
metaStorageMgr,
schemaManager,
view -> new LocalLogStorageFactory(),
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
hybridClock,
clockService,
new OutgoingSnapshotsManager(clusterSvc.messagingService()),
topologyAwareRaftGroupServiceFactory,
distributionZoneManager,
schemaSyncService,
catalogManager,
new HybridTimestampTracker(),
placementDriverManager.placementDriver(),
sqlRef::get,
resourcesRegistry,
rebalanceScheduler,
lowWatermark,
transactionInflights
);
var indexManager = new IndexManager(
schemaManager,
tableManager,
catalogManager,
threadPoolsManager.tableIoExecutor(),
registry,
lowWatermark
);
var metricManager = new MetricManager();
SqlQueryProcessor qryEngine = new SqlQueryProcessor(
registry,
clusterSvc,
logicalTopologyService,
tableManager,
schemaManager,
dataStorageManager,
replicaService,
clockService,
schemaSyncService,
catalogManager,
metricManager,
new SystemViewManagerImpl(name, catalogManager),
failureProcessor,
partitionIdleSafeTimePropagationPeriodMsSupplier,
placementDriverManager.placementDriver(),
clusterConfigRegistry.getConfiguration(SqlDistributedConfiguration.KEY),
nodeCfgMgr.configurationRegistry().getConfiguration(SqlLocalConfiguration.KEY),
transactionInflights
);
sqlRef.set(new IgniteSqlImpl(qryEngine, new IgniteTransactionsImpl(txManager, new HybridTimestampTracker())));
// Preparing the result map.
components.add(vault);
components.add(nodeCfgMgr);
// Start.
vault.start();
vault.putName(name);
nodeCfgMgr.start();
// Start the remaining components.
List<IgniteComponent> otherComponents = List.of(
threadPoolsManager,
failureProcessor,
workerRegistry,
nettyBootstrapFactory,
nettyWorkersRegistrar,
clusterSvc,
raftMgr,
clusterStateStorage,
cmgManager,
replicaMgr,
txManager,
resourceVacuumManager,
lowWatermark,
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
clockWaiter,
catalogManager,
schemaManager,
distributionZoneManager,
tableManager,
indexManager,
qryEngine,
sqlRef.get()
);
for (IgniteComponent component : otherComponents) {
component.start();
components.add(component);
}
lowWatermark.scheduleUpdates();
PartialNode partialNode = partialNode(
name,
nodeCfgMgr,
clusterCfgMgr,
metaStorageMgr,
components,
localConfigurationGenerator,
logicalTopology,
cfgStorage,
distributedConfigurationGenerator,
clusterConfigRegistry,
hybridClock
);
partialNodes.add(partialNode);
return partialNode;
}
/**
* Returns a path to the partitions store directory. Creates a directory if it doesn't exist.
*
* @param workDir Ignite work directory.
* @return Partitions store path.
*/
private static Path getPartitionsStorePath(Path workDir) {
Path partitionsStore = workDir.resolve(Paths.get("db"));
try {
Files.createDirectories(partitionsStore);
} catch (IOException e) {
throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e);
}
return partitionsStore;
}
/**
* Starts an {@code amount} number of nodes (with sequential indices starting from 0).
*/
private List<IgniteImpl> startNodes(int amount) {
boolean initNeeded = CLUSTER_NODES_NAMES.isEmpty();
List<CompletableFuture<Ignite>> futures = IntStream.range(0, amount)
.mapToObj(i -> startNodeAsync(i, null))
.collect(toList());
if (initNeeded) {
String nodeName = CLUSTER_NODES_NAMES.get(0);
InitParameters initParameters = InitParameters.builder()
.destinationNodeName(nodeName)
.metaStorageNodeNames(List.of(nodeName))
.clusterName("cluster")
.build();
TestIgnitionManager.init(initParameters);
}
return futures.stream()
.map(future -> {
assertThat(future, willCompleteSuccessfully());
return (IgniteImpl) future.join();
})
.collect(toList());
}
/**
* Restarts empty node.
*/
@Test
public void emptyNodeTest() {
IgniteImpl ignite = startNode(0);
int nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(DEFAULT_NODE_PORT, nodePort);
stopNode(0);
ignite = startNode(0);
nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(DEFAULT_NODE_PORT, nodePort);
}
/**
* Check correctness of return results after node restart.
* Scenario:
* <ol>
* <li>Start two nodes and fill the data.</li>
* <li>Create index.</li>
* <li>Check explain contain index scan.</li>
* <li>Check return results.</li>
* <li>Restart one node.</li>
* <li>Run query and compare results.</li>
* </ol>
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-19091")
public void testQueryCorrectnessAfterNodeRestart() throws InterruptedException {
IgniteImpl ignite1 = startNode(0);
createTableWithoutData(ignite1, TABLE_NAME, 2, 1);
IgniteImpl ignite2 = startNode(1);
String sql = "SELECT id FROM " + TABLE_NAME + " WHERE id > 0 ORDER BY id";
int intRes;
IgniteSql sql1 = ignite1.sql();
IgniteSql sql2 = ignite2.sql();
createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1);
sql1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME + "(id)");
waitForIndexToBecomeAvailable(List.of(ignite1, ignite2), "idx1");
ResultSet<SqlRow> plan = sql1.execute(null, "EXPLAIN PLAN FOR " + sql);
String planStr = plan.next().stringValue(0);
assertTrue(planStr.contains("IndexScan"));
ResultSet<SqlRow> res1 = sql1.execute(null, sql);
ResultSet<SqlRow> res2 = sql2.execute(null, sql);
intRes = res1.next().intValue(0);
assertEquals(intRes, res2.next().intValue(0));
res1.close();
res2.close();
stopNode(0);
ignite1 = startNode(0);
sql1 = ignite1.sql();
ResultSet<SqlRow> res3 = sql1.execute(null, sql);
assertEquals(intRes, res3.next().intValue(0));
}
/**
* Restarts a node with changing configuration.
*/
@Test
public void changeConfigurationOnStartTest() {
IgniteImpl ignite = startNode(0);
int nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(DEFAULT_NODE_PORT, nodePort);
stopNode(0);
int newPort = 3322;
String updateCfg = "network.port=" + newPort;
ignite = startNode(0, updateCfg);
nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(newPort, nodePort);
}
/**
* Tests that a new node's attributes configuration is propagated after node restart.
*/
@Test
public void changeNodeAttributesConfigurationOnStartTest() {
IgniteImpl ignite = startNode(0);
Map<String, String> attributes = new HashMap<>();
NodeAttributesConfiguration attributesConfiguration = ignite.nodeConfiguration().getConfiguration(NodeAttributesConfiguration.KEY);
attributesConfiguration.nodeAttributes().value().namedListKeys().forEach(
key -> attributes.put(key, attributesConfiguration.nodeAttributes().get(key).attribute().value())
);
assertEquals(Collections.emptyMap(), attributes);
stopNode(0);
String newAttributesCfg = "{\n"
+ " region.attribute = \"US\"\n"
+ " storage.attribute = \"SSD\"\n"
+ "}";
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD");
String updateCfg = "nodeAttributes.nodeAttributes=" + newAttributesCfg;
ignite = startNode(0, updateCfg);
NodeAttributesConfiguration newAttributesConfiguration =
ignite.nodeConfiguration().getConfiguration(NodeAttributesConfiguration.KEY);
Map<String, String> newAttributes = new HashMap<>();
newAttributesConfiguration.nodeAttributes().value().namedListKeys().forEach(
key -> newAttributes.put(key, newAttributesConfiguration.nodeAttributes().get(key).attribute().value())
);
assertEquals(newAttributesMap, newAttributes);
}
/**
* Restarts the node which stores some data.
*/
@Test
public void nodeWithDataTest() {
IgniteImpl ignite = startNode(0);
createTableWithData(List.of(ignite), TABLE_NAME, 1);
stopNode(0);
ignite = startNode(0);
checkTableWithData(ignite, TABLE_NAME);
}
@Test
public void testNodeSeesItselfInLocalLogicalTopology() {
List<IgniteImpl> nodes = startNodes(3);
// Here we check that node sees itself in local logical topology.
nodes.forEach(node -> assertTrue(node.logicalTopologyService().localLogicalTopology().nodes().stream().map(LogicalNode::id)
.collect(toSet()).contains(node.id())));
// Actually we have stronger guarantees because of awaiting all nodes to start inside startNodes.
// On one node (cmg leader) we will see all three nodes in local logical topology.
// On the node that started second we will see at least two nodes.
// On the third node we will see all three nodes.
// All in all that means that in total we will see at least (3 + 2 + 3) nodes.
Integer sumOfLogicalTopologyProjectionSizes =
nodes.stream().map(node -> node.logicalTopologyService().localLogicalTopology().nodes().size())
.reduce(0, Integer::sum);
assertTrue(sumOfLogicalTopologyProjectionSizes >= 3 + 2 + 3);
}
/**
* Restarts the node which stores some data.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void metastorageRecoveryTest(boolean useSnapshot) throws InterruptedException {
List<IgniteImpl> nodes = startNodes(2);
IgniteImpl main = nodes.get(0);
createTableWithData(List.of(main), TABLE_NAME, 1);
stopNode(1);
MetaStorageManager metaStorageManager = main.metaStorageManager();
CompletableFuture[] futs = new CompletableFuture[10];
for (int i = 0; i < 10; i++) {
// Put some data to the MetaStorage so that there would be new entries to apply to the restarting node.
ByteArray key = ByteArray.fromString("some-test-key-" + i);
futs[i] = metaStorageManager.put(key, new byte[]{(byte) i});
}
assertThat(CompletableFuture.allOf(futs), willSucceedFast());
if (useSnapshot) {
forceSnapshotUsageOnRestart(main);
}
IgniteImpl second = startNode(1);
checkTableWithData(second, TABLE_NAME);
MetaStorageManager restartedMs = second.metaStorageManager();
for (int i = 0; i < 10; i++) {
ByteArray key = ByteArray.fromString("some-test-key-" + i);
byte[] value = restartedMs.getLocally(key, 100).value();
assertEquals(1, value.length);
assertEquals((byte) i, value[0]);
}
}
/**
* Checks that logical topology version is maintained after nodes restart.
*/
@Test
public void logicalTopologyVersionMaintainedTest() {
IgniteImpl main = startNodes(3).get(0);
stopNode(1);
IgniteImpl restarted = startNode(1);
stopNode(2);
IgniteImpl secondRestarted = startNode(2);
long mainVersion = main.logicalTopologyService().localLogicalTopology().version();
long restartedVersion = restarted.logicalTopologyService().localLogicalTopology().version();
long secondRestartedVersion = secondRestarted.logicalTopologyService().localLogicalTopology().version();
assertEquals(mainVersion, restartedVersion);
assertEquals(mainVersion, secondRestartedVersion);
}
private static void forceSnapshotUsageOnRestart(IgniteImpl main) throws InterruptedException {
// Force log truncation, so that restarting node would request a snapshot.
JraftServerImpl server = (JraftServerImpl) main.raftManager().server();
List<Peer> peers = server.localPeers(MetastorageGroupId.INSTANCE);
Peer learnerPeer = peers.stream().filter(peer -> peer.idx() == 0).findFirst().orElseThrow(
() -> new IllegalStateException(String.format("No leader peer"))
);
var nodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, learnerPeer);
RaftGroupService raftGroupService = server.raftGroupService(nodeId);
for (int i = 0; i < 2; i++) {
// Log must be truncated twice.
CountDownLatch snapshotLatch = new CountDownLatch(1);
AtomicReference<Status> snapshotStatus = new AtomicReference<>();
raftGroupService.getRaftNode().snapshot(status -> {
snapshotStatus.set(status);
snapshotLatch.countDown();
});
assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time");
assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get());
}
}
/**
* Restarts the node which stores some data.
*/
@Test
public void nodeWithDataAndIndexRebuildTest() {
IgniteImpl ignite = startNode(0);
int partitions = 20;
createTableWithData(List.of(ignite), TABLE_NAME, 1, partitions);
TableViewInternal table = unwrapTableViewInternal(ignite.tables().table(TABLE_NAME));
InternalTableImpl internalTable = (InternalTableImpl) table.internalTable();
CompletableFuture[] flushFuts = new CompletableFuture[partitions];
for (int i = 0; i < partitions; i++) {
int finalI = i;
// Flush data on disk, so that we will have a snapshot to read on restart.
flushFuts[i] = bypassingThreadAssertionsAsync(() -> internalTable.storage().getMvPartition(finalI).flush());
}
assertThat(CompletableFuture.allOf(flushFuts), willCompleteSuccessfully());
// Add more data, so that on restart there will be a index rebuilding operation.
for (int i = 0; i < 100; i++) {
ignite.sql().execute(null, "INSERT INTO " + TABLE_NAME + "(id, name) VALUES (?, ?)",
i + 500, VALUE_PRODUCER.apply(i + 500));
}
stopNode(0);
ignite = startNode(0);
checkTableWithData(ignite, TABLE_NAME);
TableViewInternal tableAfterRestart = unwrapTableViewInternal(ignite.tables().table(TABLE_NAME));
// Check data that was added after flush.
bypassingThreadAssertions(() -> {
for (int i = 0; i < 100; i++) {
Tuple row = tableAfterRestart.keyValueView().get(null, Tuple.create().set("id", i + 500));
Objects.requireNonNull(row, "row");
assertEquals(VALUE_PRODUCER.apply(i + 500), row.stringValue("name"));
}
});
}
/**
* Starts two nodes and checks that the data are storing through restarts. Nodes restart in the same order when they started at first.
*/
@Test
public void testTwoNodesRestartDirect() {
twoNodesRestart(true);
}
/**
* Starts two nodes and checks that the data are storing through restarts. Nodes restart in reverse order when they started at first.
*/
@Test
public void testTwoNodesRestartReverse() {
twoNodesRestart(false);
}
/**
* Starts two nodes and checks that the data are storing through restarts.
*
* @param directOrder When the parameter is true, nodes restart in direct order, otherwise they restart in reverse order.
*/
private void twoNodesRestart(boolean directOrder) {
List<IgniteImpl> nodes = startNodes(2);
createTableWithData(nodes, TABLE_NAME, 2);
createTableWithData(nodes, TABLE_NAME_2, 2);
stopNode(0);
stopNode(1);
Ignite ignite;
if (directOrder) {
startNode(0);
ignite = startNode(1);
} else {
// Since the first node is the CMG leader, the second node can't be started synchronously (it won't be able to join the cluster
// and the future will never resolve).
CompletableFuture<Ignite> future = startNodeAsync(1, null);
startNode(0);
assertThat(future, willCompleteSuccessfully());
ignite = future.join();
}
checkTableWithData(ignite, TABLE_NAME);
checkTableWithData(ignite, TABLE_NAME_2);
}
/**
* Check that the table with given name is present in TableManager.
*
* @param tableManager Table manager.
* @param tableName Table name.
*/
private void assertTablePresent(TableManager tableManager, String tableName) {
Collection<TableImpl> tables = tableManager.startedTables().values();
boolean isPresent = false;
for (TableImpl table : tables) {
if (table.name().equals(tableName)) {
isPresent = true;
break;
}
}
assertTrue(isPresent, "tableName=" + tableName + ", tables=" + tables);
}
/**
* Checks that one node in a cluster of 2 nodes is able to restart and recover a table that was created when this node was absent. Also
* checks that the table created before node stop, is not available when majority if lost.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-20137")
public void testOneNodeRestartWithGap() {
IgniteImpl ignite = startNode(0);
startNode(1);
createTableWithData(List.of(ignite), TABLE_NAME, 2);
stopNode(1);
Table table = ignite.tables().table(TABLE_NAME);
assertNotNull(table);
assertThrows(TransactionException.class, () -> table.keyValueView().get(null, Tuple.create().set("id", 0)));
createTableWithoutData(ignite, TABLE_NAME_2, 1, 1);
IgniteImpl ignite1 = startNode(1);
TableManager tableManager = (TableManager) ignite1.tables();
assertNotNull(tableManager);
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
assertTablePresent(tableManager, TABLE_NAME_2.toUpperCase());
}
/**
* Checks that the table created in cluster of 2 nodes, is recovered on a node after restart of this node.
*/
@Test
public void testRecoveryOnOneNode() {
IgniteImpl ignite = startNode(0);
IgniteImpl node = startNode(1);
createTableWithData(List.of(ignite), TABLE_NAME, 2, 1);
stopNode(1);
node = startNode(1);
TableManager tableManager = unwrapTableManager(node.tables());
assertNotNull(tableManager);
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
}
/**
* Checks that a cluster is able to restart when some changes were made in configuration.
*/
@Test
public void testRestartDiffConfig() {
List<IgniteImpl> ignites = startNodes(2);
createTableWithData(ignites, TABLE_NAME, 2);
createTableWithData(ignites, TABLE_NAME_2, 2);
stopNode(0);
stopNode(1);
startNode(0);
@Language("HOCON") String cfgString = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG,
DEFAULT_NODE_PORT + 11,
"[\"localhost:" + DEFAULT_NODE_PORT + "\"]",
DEFAULT_CLIENT_PORT + 11,
DEFAULT_HTTP_PORT + 11,
DEFAULT_HTTPS_PORT + 11,
"{}"
);
IgniteImpl node1 = startNode(1, cfgString);
TableManager tableManager = unwrapTableManager(node1.tables());
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
}
/**
* The test for node restart when there is a gap between the node local configuration and distributed configuration.
*/
@Test
public void testCfgGapWithoutData() {
List<IgniteImpl> nodes = startNodes(3);
createTableWithData(nodes, TABLE_NAME, nodes.size());
log.info("Stopping the node.");
stopNode(nodes.size() - 1);
nodes.set(nodes.size() - 1, null);
createTableWithData(nodes, TABLE_NAME_2, nodes.size());
createTableWithData(nodes, TABLE_NAME_2 + "0", nodes.size());
log.info("Starting the node.");
IgniteImpl node = startNode(nodes.size() - 1, null);
log.info("After starting the node.");
TableManager tableManager = unwrapTableManager(node.tables());
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
assertTablePresent(tableManager, TABLE_NAME_2.toUpperCase());
}
/**
* The test for node restart when there is a gap between the node local configuration and distributed configuration, and metastorage
* group stops for some time while restarting node is being recovered. The recovery process should continue and eventually succeed after
* metastorage group starts again.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18919")
public void testMetastorageStop() throws NodeStoppingException {
int cfgGap = 4;
List<IgniteImpl> nodes = startNodes(3);
log.info("Stopping the node.");
stopNode(nodes.size() - 1);
nodes.set(nodes.size() - 1, null);
for (int i = 0; i < cfgGap; i++) {
createTableWithData(nodes, "t" + i, nodes.size(), 1);
}
log.info("Starting the node.");
PartialNode partialNode = startPartialNode(
nodes.size() - 1,
configurationString(nodes.size() - 1)
// TODO IGNITE-18919 here the revision callback was removed, because meta storage recovery process was changed.
);
TableManager tableManager = findComponent(partialNode.startedComponents(), TableManager.class);
for (int i = 0; i < cfgGap; i++) {
assertTablePresent(tableManager, "T" + i);
}
}
/**
* The test for node restart when there is a gap between the node local configuration and distributed configuration.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-20996")
public void testCfgGap() {
List<IgniteImpl> nodes = startNodes(4);
createTableWithData(nodes, "t1", nodes.size());
log.info("Stopping the node.");
stopNode(nodes.size() - 1);
nodes.set(nodes.size() - 1, null);
checkTableWithData(nodes.get(0), "t1");
createTableWithData(nodes, "t2", nodes.size());
log.info("Starting the node.");
IgniteImpl newNode = startNode(nodes.size() - 1);
checkTableWithData(nodes.get(0), "t1");
checkTableWithData(nodes.get(0), "t2");
checkTableWithData(newNode, "t1");
checkTableWithData(newNode, "t2");
}
/**
* The test for updating cluster configuration with the default value.
* Check that new nodes will be able to synchronize the local cluster configuration.
*/
@Test
public void updateClusterCfgWithDefaultValue() {
IgniteImpl ignite = startNode(0);
GcConfiguration gcConfiguration = ignite.clusterConfiguration()
.getConfiguration(GcConfiguration.KEY);
int defaultValue = gcConfiguration.batchSize().value();
CompletableFuture<Void> update = gcConfiguration.batchSize().update(defaultValue);
assertThat(update, willCompleteSuccessfully());
stopNode(0);
startNodes(3);
}
@Test
public void destroyObsoleteStoragesOnRestart() throws InterruptedException {
int nodesCount = 3;
List<IgniteImpl> nodes = startNodes(nodesCount);
int partitions = nodesCount;
int replicas = nodesCount;
createTableWithData(nodes, TABLE_NAME, replicas, partitions);
int restartedNodeIndex = nodesCount - 1;
WatchListenerInhibitor inhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodes.get(restartedNodeIndex));
inhibitor.startInhibit();
// Change the zone - one replica per partition.
alterZone(nodes.get(0).catalogManager(), String.format("ZONE_%s", TABLE_NAME.toUpperCase()), 1);
stopNode(restartedNodeIndex);
inhibitor.stopInhibit();
IgniteImpl restartedNode = startNode(restartedNodeIndex);
TableImpl table = unwrapTableImpl(restartedNode.tables().table(TABLE_NAME));
assertTrue(waitForCondition(() -> {
// Check that only storage for 1 partition left on the restarted node.
return IntStream.range(0, partitions)
.mapToObj(i -> table.internalTable().storage().getMvPartition(i))
.filter(Objects::nonNull)
.count() == 1;
}, 10_000));
}
@Test
public void testCorrectPartitionRecoveryOnSnapshot() throws InterruptedException {
int nodesCount = 3;
List<IgniteImpl> nodes = startNodes(nodesCount);
int partitions = nodesCount;
int replicas = nodesCount;
createTableWithData(nodes, TABLE_NAME, replicas, partitions);
int restartedNodeIndex = nodesCount - 1;
WatchListenerInhibitor inhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodes.get(restartedNodeIndex));
inhibitor.startInhibit();
alterZone(nodes.get(0).catalogManager(), String.format("ZONE_%s", TABLE_NAME.toUpperCase()), 1);
stopNode(restartedNodeIndex);
inhibitor.stopInhibit();
forceSnapshotUsageOnRestart(nodes.get(0));
IgniteImpl restartedNode = startNode(restartedNodeIndex);
TableImpl table = unwrapTableImpl(restartedNode.tables().table(TABLE_NAME));
long recoveryRevision = restartedNode.metaStorageManager().recoveryFinishedFuture().join();
PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(nodes.stream().map(IgniteImpl::name)
.collect(toSet()), Set.of());
for (int p = 0; p < partitions; p++) {
TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), p);
Entry e = restartedNode.metaStorageManager().getLocally(stablePartAssignmentsKey(tablePartitionId), recoveryRevision);
Set<Assignment> assignment = Assignments.fromBytes(e.value()).nodes();
boolean shouldBe = assignment.stream().anyMatch(n -> n.consistentId().equals(restartedNode.name()));
Peer peer = configuration.peer(restartedNode.name());
boolean isStarted = restartedNode.raftManager().isStarted(new RaftNodeId(tablePartitionId, peer));
assertEquals(shouldBe, isStarted);
}
}
/**
* Checks that after create table call there is the same value of stable assignments in every node's meta storage, and it was
* changed only once.
*
* @param populateStableAssignmentsBeforeTableCreation Whether to populate stable assignments before table creation.
* @param restart Whether to restart one of the nodes while creating the table.
* @throws InterruptedException If interrupted.
*/
@ParameterizedTest
@CsvSource({
"true,true",
"true,false",
"false,true",
"false,false"
})
public void createTableCallOnMultipleNodesTest(boolean populateStableAssignmentsBeforeTableCreation, boolean restart)
throws InterruptedException {
int nodesCount = 3;
var nodes = startNodes(3);
var node = nodes.get(0);
Map<Integer, AtomicInteger> stableAssignmentsChangeCounters = new ConcurrentHashMap<>();
Map<Integer, AtomicBoolean> lateChangeFlag = new ConcurrentHashMap<>();
// Prefix that will be updated after the table creation.
String testPrefix = "testPrefix";
for (int i = 0; i < nodesCount; i++) {
stableAssignmentsChangeCounters.put(i, new AtomicInteger());
lateChangeFlag.put(i, new AtomicBoolean());
final int fi = i;
createWatchListener(
nodes.get(i).metaStorageManager(),
STABLE_ASSIGNMENTS_PREFIX,
e -> stableAssignmentsChangeCounters.get(fi).incrementAndGet()
);
createWatchListener(
nodes.get(i).metaStorageManager(),
testPrefix,
e -> lateChangeFlag.get(fi).set(true)
);
}
var partId = new TablePartitionId(TABLE_ID, 0);
// Populate the stable assignments before calling table create, if needed.
if (populateStableAssignmentsBeforeTableCreation) {
node.metaStorageManager().put(stablePartAssignmentsKey(partId), Assignments.toBytes(Set.of(Assignment.forPeer(node.name()))));
waitForCondition(() -> lateChangeFlag.values().stream().allMatch(AtomicBoolean::get), 5_000);
lateChangeFlag.values().forEach(v -> v.set(false));
}
String zoneName = "TEST_ZONE";
if (restart) {
stopNode(nodesCount - 2);
}
IgniteSql sql = node.sql();
sql.execute(null, String.format("CREATE ZONE IF NOT EXISTS %s WITH REPLICAS=%d, PARTITIONS=%d, STORAGE_PROFILES='%s'",
zoneName, nodesCount, 1, DEFAULT_STORAGE_PROFILE));
sql.execute(null, "CREATE TABLE " + TABLE_NAME
+ "(id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='" + zoneName + "';");
assertEquals(TABLE_ID, tableId(node, TABLE_NAME));
node.metaStorageManager().put(new ByteArray(testPrefix.getBytes(StandardCharsets.UTF_8)), new byte[0]);
if (restart) {
IgniteImpl restartedNode = startNode(nodesCount - 2);
nodes.set(nodesCount - 2, restartedNode);
}
// Waiting for late prefix on all nodes.
waitForCondition(() -> lateChangeFlag.values().stream().allMatch(AtomicBoolean::get), 5_000);
var assignmentsKey = stablePartAssignmentsKey(partId).bytes();
Set<Assignment> expectedAssignments = getAssignmentsFromMetaStorage(node.metaStorageManager(), assignmentsKey);
// Checking that the stable assignments are same and were changed only once on every node.
for (int i = 0; i < nodesCount; i++) {
if (!restart) {
// TODO IGNITE-21194 return this check
assertEquals(1, stableAssignmentsChangeCounters.get(i).get(), "node index=" + i);
}
assertEquals(
expectedAssignments,
getAssignmentsFromMetaStorage(nodes.get(i).metaStorageManager(), assignmentsKey),
"node index=" + i
);
}
}
private void createWatchListener(MetaStorageManager metaStorageManager, String prefix, Consumer<WatchEvent> listener) {
metaStorageManager.registerPrefixWatch(
new ByteArray(prefix.getBytes(StandardCharsets.UTF_8)),
new WatchListener() {
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
listener.accept(event);
return nullCompletedFuture();
}
@Override
public void onError(Throwable e) {
log.error("Error in test watch listener", e);
}
}
);
}
/**
* Creates the table on a cluster of 3 nodes, delays the table start processing, stops 2 nodes before the assignments are applied
* to meta storage. The remaining node is hanging, so it doesn't write the updates to meta storage as well. Then the stopped nodes
* are restarted, and receive different value of data nodes from distribution zones manager. We check that the calculated assignments
* for the table are eventually equal on every node's local meta storages, and they are equal to expected - to the assignments of
* the first node that is able to initialize them during the table start.
*
* @param nodeThatWrittenAssignments The index of the restarted node the has written the calculated assignments successfully.
* @param nodeThatPicksUpAssignments The index of the restarted node that picks up assignments from meta storage.
* @throws Exception If failed.
*/
@ParameterizedTest
@CsvSource({
"1,2",
"2,1"
})
public void tableRecoveryOnMultipleRestartingNodes(int nodeThatWrittenAssignments, int nodeThatPicksUpAssignments) throws Exception {
var node0 = startNode(0);
int idx1 = 1;
int idx2 = 2;
var node1 = startPartialNode(idx1, configurationString(idx1));
var node2 = startPartialNode(idx2, configurationString(idx2));
String tableName = "TEST";
String zoneName = "ZONE_TEST";
var assignmentsKey = stablePartAssignmentsKey(new TablePartitionId(TABLE_ID, 0));
var metaStorageInterceptorFut = new CompletableFuture<>();
var metaStorageInterceptorInnerFut = new CompletableFuture<>();
metaStorageInvokeInterceptorByNode.put(nodeThatPicksUpAssignments,
(cond, success, failure) -> {
if (checkMetaStorageInvoke(success, assignmentsKey)) {
metaStorageInterceptorInnerFut.complete(null);
metaStorageInterceptorFut.join();
}
return null;
}
);
MetaStorageManager msManager1 = findComponent(node1.startedComponents(), MetaStorageManager.class);
MetaStorageManager msManager2 = findComponent(node2.startedComponents(), MetaStorageManager.class);
WatchListenerInhibitor nodeInhibitor0 = WatchListenerInhibitor.metastorageEventsInhibitor(node0.metaStorageManager());
WatchListenerInhibitor nodeInhibitor1 = WatchListenerInhibitor.metastorageEventsInhibitor(msManager1);
WatchListenerInhibitor nodeInhibitor2 = WatchListenerInhibitor.metastorageEventsInhibitor(msManager2);
// Create table, all nodes are lagging.
IgniteSql sql = node0.sql();
sql.execute(null, String.format("CREATE ZONE IF NOT EXISTS %s WITH REPLICAS=%d, PARTITIONS=%d, STORAGE_PROFILES='%s'",
zoneName, 2, 1, DEFAULT_STORAGE_PROFILE));
nodeInhibitor0.startInhibit();
nodeInhibitor1.startInhibit();
nodeInhibitor2.startInhibit();
sql.executeAsync(null, "CREATE TABLE " + tableName
+ "(id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='" + zoneName + "';");
// Stopping 2 of 3 nodes.
node1.stop();
node2.stop();
Set<String> dataNodesOnNode1 = Set.of(node0.name(), node1.name());
Set<String> dataNodesOnNode2 = Set.of(node1.name(), node2.name());
dataNodesMockByNode.put(idx1, () -> completedFuture(dataNodesOnNode1));
dataNodesMockByNode.put(idx2, () -> completedFuture(dataNodesOnNode2));
AtomicReference<PartialNode> nodeWnRef = new AtomicReference<>();
AtomicReference<PartialNode> nodePnRef = new AtomicReference<>();
// Restarting 2 nodes.
var nodePnFut = runAsync(() ->
nodePnRef.set(startPartialNode(nodeThatPicksUpAssignments, configurationString(nodeThatPicksUpAssignments))));
assertThat(metaStorageInterceptorInnerFut, willCompleteSuccessfully());
var nodeWnFut = runAsync(() ->
nodeWnRef.set(startPartialNode(nodeThatWrittenAssignments, configurationString(nodeThatWrittenAssignments))));
assertThat(nodeWnFut, willCompleteSuccessfully());
var msManagerRestartedW = findComponent(nodeWnRef.get().startedComponents(), MetaStorageManager.class);
waitForValueInLocalMs(msManagerRestartedW, assignmentsKey);
metaStorageInterceptorFut.complete(null);
assertThat(nodePnFut, willCompleteSuccessfully());
var msManagerRestartedP = findComponent(nodePnRef.get().startedComponents(), MetaStorageManager.class);
waitForValueInLocalMs(msManagerRestartedP, assignmentsKey);
// Node 0 stops hanging.
nodeInhibitor0.stopInhibit();
waitForValueInLocalMs(node0.metaStorageManager(), assignmentsKey);
assertEquals(TABLE_ID, tableId(node0, tableName));
Set<Assignment> expectedAssignments = dataNodesMockByNode.get(nodeThatWrittenAssignments).get().join()
.stream().map(Assignment::forPeer).collect(toSet());
checkAssignmentsInMetaStorage(node0.metaStorageManager(), assignmentsKey.bytes(), expectedAssignments);
checkAssignmentsInMetaStorage(msManagerRestartedW, assignmentsKey.bytes(), expectedAssignments);
checkAssignmentsInMetaStorage(msManagerRestartedP, assignmentsKey.bytes(), expectedAssignments);
}
@Test
public void testSequentialAsyncTableCreationThenAlterZoneThenRestartOnMsSnapshot() throws InterruptedException {
var node0 = startNode(0);
var node1 = startNode(1);
String tableName = "TEST";
String zoneName = "ZONE_TEST";
node0.sql().execute(null,
String.format("CREATE ZONE IF NOT EXISTS %s WITH REPLICAS=%d, PARTITIONS=%d, STORAGE_PROFILES='%s'",
zoneName, 2, 1, DEFAULT_STORAGE_PROFILE));
int catalogVersionBeforeTable = node0.catalogManager().latestCatalogVersion();
WatchListenerInhibitor nodeInhibitor0 = WatchListenerInhibitor.metastorageEventsInhibitor(node0);
WatchListenerInhibitor nodeInhibitor1 = WatchListenerInhibitor.metastorageEventsInhibitor(node1);
nodeInhibitor0.startInhibit();
nodeInhibitor1.startInhibit();
var assignmentsKey = stablePartAssignmentsKey(new TablePartitionId(TABLE_ID, 0));
var tableFut = createTableInCatalog(node0.catalogManager(), tableName, zoneName);
stopNode(1);
var alterZoneFut = alterZoneAsync(node0.catalogManager(), zoneName, 1);
// Wait for the next catalog version: table creation.
// The next catalog update (alter zone) can't be processed until the table creation is completed.
assertTrue(waitForCondition(() -> {
int ver = latestCatalogVersionInMs(node0.metaStorageManager());
return ver == catalogVersionBeforeTable + 1;
}, 10_000));
// Double check that the version is exactly as expected, if it's not, the synchronous sequential catalog update processing may be
// broken.
assertEquals(catalogVersionBeforeTable + 1, latestCatalogVersionInMs(node0.metaStorageManager()));
Thread.sleep(1000);
forceSnapshotUsageOnRestart(node0);
log.info("Restarting node 1.");
node1 = startNode(1);
waitForValueInLocalMs(node1.metaStorageManager(), assignmentsKey);
nodeInhibitor0.stopInhibit();
assertThat(tableFut, willCompleteSuccessfully());
assertThat(alterZoneFut, willCompleteSuccessfully());
assertEquals(TABLE_ID, tableId(node0, tableName));
waitForValueInLocalMs(node0.metaStorageManager(), assignmentsKey);
var finalNode1 = node1;
// Restart is followed by rebalance, because data nodes are recalculated after full table creation that is completed after restart.
assertTrue(waitForCondition(() -> {
Set<Assignment> assignments0 = getAssignmentsFromMetaStorage(node0.metaStorageManager(), assignmentsKey.bytes());
Set<Assignment> assignments1 = getAssignmentsFromMetaStorage(finalNode1.metaStorageManager(), assignmentsKey.bytes());
return assignments0.size() == 1 && assignments0.equals(assignments1);
}, 10_000));
}
private int latestCatalogVersionInMs(MetaStorageManager metaStorageManager) {
var e = metaStorageManager.getLocally(new ByteArray("catalog.version".getBytes(StandardCharsets.UTF_8)), 1000);
if (e == null || e.empty()) {
return -1;
}
return ByteUtils.bytesToInt(e.value());
}
private static CompletableFuture<?> createTableInCatalog(CatalogManager catalogManager, String tableName, String zoneName) {
var tableColumn = ColumnParams.builder().name("id").type(INT32).build();
TableHashPrimaryKey primaryKey = TableHashPrimaryKey.builder()
.columns(List.of("id"))
.build();
var createTableCommand = CreateTableCommand.builder()
.schemaName("PUBLIC")
.tableName(tableName)
.columns(List.of(tableColumn))
.primaryKey(primaryKey)
.zone(zoneName)
.build();
return catalogManager.execute(createTableCommand);
}
private static CompletableFuture<?> alterZoneAsync(
CatalogManager catalogManager,
String zoneName,
@Nullable Integer replicas
) {
AlterZoneCommandBuilder builder = AlterZoneCommand.builder().zoneName(zoneName);
builder.replicas(replicas);
return catalogManager.execute(builder.build());
}
private void waitForValueInLocalMs(MetaStorageManager metaStorageManager, ByteArray key) throws InterruptedException {
assertTrue(waitForCondition(() -> {
var e = metaStorageManager.getLocally(key, metaStorageManager.appliedRevision());
return !e.empty();
}, 10_000));
}
private boolean checkMetaStorageInvoke(Collection<Operation> ops, ByteArray key) {
var k = new String(key.bytes(), StandardCharsets.UTF_8);
return ops.stream().anyMatch(op -> new String(op.key(), StandardCharsets.UTF_8).equals(k));
}
private void checkAssignmentsInMetaStorage(MetaStorageManager metaStorageManager, byte[] assignmentsKey, Set<Assignment> expected) {
Set<Assignment> a = getAssignmentsFromMetaStorage(metaStorageManager, assignmentsKey);
assertEquals(expected, a);
}
private Set<Assignment> getAssignmentsFromMetaStorage(MetaStorageManager metaStorageManager, byte[] assignmentsKey) {
var e = metaStorageManager.getLocally(new ByteArray(assignmentsKey), metaStorageManager.appliedRevision());
return e == null || e.tombstone() || e.empty()
? emptySet()
: Assignments.fromBytes(e.value()).nodes();
}
private int tableId(Ignite node, String tableName) {
return (unwrapTableImpl(node.tables().table(tableName))).tableId();
}
/**
* Checks the table exists and validates all data in it.
*
* @param ignite Ignite.
* @param name Table name.
*/
private static void checkTableWithData(Ignite ignite, String name) {
Table table = ignite.tables().table(name);
assertNotNull(table);
for (int i = 0; i < 100; i++) {
int fi = i;
Awaitility.with()
.await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.pollDelay(0, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> {
try {
Tuple row = table.keyValueView().get(null, Tuple.create().set("id", fi));
if (row == null) {
return false;
} else {
assertEquals(VALUE_PRODUCER.apply(fi), row.stringValue("name"));
return true;
}
} catch (TransactionException te) {
// There may be an exception if the primary replica node was stopped. We should wait for new primary to appear.
return false;
}
});
}
}
/**
* Creates a table and load data to it.
*
* @param nodes Ignite nodes.
* @param name Table name.
* @param replicas Replica factor.
*/
private void createTableWithData(List<IgniteImpl> nodes, String name, int replicas) {
createTableWithData(nodes, name, replicas, 2);
}
/**
* Creates a table and load data to it.
*
* @param nodes Ignite nodes.
* @param name Table name.
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
private void createTableWithData(List<IgniteImpl> nodes, String name, int replicas, int partitions) {
IgniteSql sql = nodes.get(0).sql();
sql.execute(null,
String.format("CREATE ZONE IF NOT EXISTS ZONE_%s WITH REPLICAS=%d, PARTITIONS=%d, STORAGE_PROFILES='%s'",
name, replicas, partitions, DEFAULT_STORAGE_PROFILE));
sql.execute(null, "CREATE TABLE IF NOT EXISTS " + name
+ "(id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='ZONE_" + name.toUpperCase() + "';");
for (int i = 0; i < 100; i++) {
sql.execute(null, "INSERT INTO " + name + "(id, name) VALUES (?, ?)",
i, VALUE_PRODUCER.apply(i));
}
}
private void waitForIndexToBecomeAvailable(Collection<IgniteImpl> nodes, String indexName) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(nodes.size());
nodes.forEach(node -> node.catalogManager().listen(CatalogEvent.INDEX_AVAILABLE, event -> {
MakeIndexAvailableEventParameters availableEvent = (MakeIndexAvailableEventParameters) event;
CatalogIndexDescriptor index = node.catalogManager().index(availableEvent.indexId(), event.catalogVersion());
assertNotNull(index, "Cannot find an index by ID=" + availableEvent.indexId());
if (index.name().equalsIgnoreCase(indexName)) {
// That's our index.
latch.countDown();
return trueCompletedFuture();
}
return falseCompletedFuture();
}));
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
/**
* Creates a table.
*
* @param ignite Ignite.
* @param name Table name.
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
private static Table createTableWithoutData(Ignite ignite, String name, int replicas, int partitions) {
IgniteSql sql = ignite.sql();
sql.execute(null,
String.format("CREATE ZONE IF NOT EXISTS ZONE_%s WITH REPLICAS=%d, PARTITIONS=%d, STORAGE_PROFILES='%s'",
name, replicas, partitions, DEFAULT_STORAGE_PROFILE));
sql.execute(null, "CREATE TABLE " + name
+ "(id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='ZONE_" + name.toUpperCase() + "';");
return ignite.tables().table(name);
}
private interface InvokeInterceptor {
Boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure);
}
}