blob: 0537d7efece1f7e01abdb4fa8d3eec3e5e2f71f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.runner.app;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import org.apache.ignite.internal.configuration.NodeConfigWriteException;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.recovery.VaultStateIds;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.storage.configurations.StoragesConfiguration;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.awaitility.Awaitility;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* These tests check node restart scenarios.
*/
@ExtendWith(ConfigurationExtension.class)
@Timeout(120)
public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
/** Value producer for table data, is used to create data and check it later. */
private static final IntFunction<String> VALUE_PRODUCER = i -> "val " + i;
/** Test table name. */
private static final String TABLE_NAME = "Table1";
/** Test table name. */
private static final String TABLE_NAME_2 = "Table2";
@InjectConfiguration("mock: " + RAFT_CFG)
private static RaftConfiguration raftConfiguration;
@InjectConfiguration
private static ClusterManagementConfiguration clusterManagementConfiguration;
@InjectConfiguration
private static NodeAttributesConfiguration nodeAttributes;
@InjectConfiguration
private static StoragesConfiguration storagesConfiguration;
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
/**
* Start some of Ignite components that are able to serve as Ignite node for test purposes.
*
* @param idx Node index.
* @param cfgString Configuration string or {@code null} to use the default configuration.
* @param revisionCallback Callback on storage revision update.
* @return Partial node.
*/
private PartialNode startPartialNode(
int idx,
@Nullable @Language("HOCON") String cfgString,
@Nullable Consumer<Long> revisionCallback
) {
String name = testNodeName(testInfo, idx);
Path dir = workDir.resolve(name);
List<IgniteComponent> components = new ArrayList<>();
VaultManager vault = createVault(name, dir);
ConfigurationModules modules = loadConfigurationModules(log, Thread.currentThread().getContextClassLoader());
Path configFile = workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
String configString = cfgString == null ? configurationString(idx) : cfgString;
try {
Files.writeString(configFile, configString);
} catch (IOException e) {
throw new NodeConfigWriteException("Failed to write config content to file.", e);
}
var localConfigurationGenerator = new ConfigurationTreeGenerator(
modules.local().rootKeys(),
modules.local().schemaExtensions(),
modules.local().polymorphicSchemaExtensions()
);
var nodeCfgMgr = new ConfigurationManager(
modules.local().rootKeys(),
new LocalFileConfigurationStorage(configFile, localConfigurationGenerator),
localConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators())
);
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, name);
var clusterSvc = new TestScaleCubeClusterServiceFactory().createClusterService(
name,
networkConfiguration,
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStateIds(vault)
);
var hybridClock = new HybridClockImpl();
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock, raftGroupEventsClientListener);
var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg"));
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterSvc,
hocon -> hocon,
new TestConfigurationValidator()
);
var cmgManager = new ClusterManagementGroupManager(
vault,
clusterSvc,
clusterInitializer,
raftMgr,
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes, storagesConfiguration)
);
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier
= () -> TestIgnitionManager.DEFAULT_PARTITION_IDLE_SYNC_TIME_INTERVAL_MS;
var replicaService = new ReplicaService(clusterSvc.messagingService(), hybridClock);
var lockManager = new HeapLockManager();
ReplicaService replicaSvc = new ReplicaService(clusterSvc.messagingService(), hybridClock);
var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterSvc,
logicalTopologyService,
Loza.FACTORY,
raftGroupEventsClientListener
);
var metaStorageMgr = new MetaStorageManagerImpl(
vault,
clusterSvc,
cmgManager,
logicalTopologyService,
raftMgr,
new RocksDbKeyValueStorage(name, dir.resolve("metastorage")),
hybridClock,
topologyAwareRaftGroupServiceFactory,
metaStorageConfiguration
);
var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
modules.distributed().schemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
var clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
cfgStorage,
distributedConfigurationGenerator,
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator, modules.distributed().validators())
);
var placementDriverManager = new PlacementDriverManager(
name,
metaStorageMgr,
MetastorageGroupId.INSTANCE,
clusterSvc,
cmgManager::metaStorageNodes,
logicalTopologyService,
raftMgr,
topologyAwareRaftGroupServiceFactory,
hybridClock
);
ReplicaManager replicaMgr = new ReplicaManager(
name,
clusterSvc,
cmgManager,
hybridClock,
Set.of(TableMessageGroup.class, TxMessageGroup.class),
placementDriverManager.placementDriver(),
partitionIdleSafeTimePropagationPeriodMsSupplier
);
var txManager = new TxManagerImpl(
clusterSvc,
replicaService,
lockManager,
hybridClock,
new TransactionIdGenerator(idx),
placementDriverManager.placementDriver(),
partitionIdleSafeTimePropagationPeriodMsSupplier
);
ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry();
Consumer<LongFunction<CompletableFuture<?>>> registry = (c) -> metaStorageMgr.registerRevisionUpdateListener(c::apply);
DataStorageModules dataStorageModules = new DataStorageModules(ServiceLoader.load(DataStorageModule.class));
Path storagePath = getPartitionsStorePath(dir);
DataStorageManager dataStorageManager = new DataStorageManager(
dataStorageModules.createStorageEngines(
name,
clusterConfigRegistry,
storagePath,
null
)
);
GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
var clockWaiter = new ClockWaiter(name, hybridClock);
LongSupplier delayDurationMsSupplier = () -> TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
var catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageMgr),
clockWaiter,
delayDurationMsSupplier,
partitionIdleSafeTimePropagationPeriodMsSupplier
);
SchemaManager schemaManager = new SchemaManager(registry, catalogManager, metaStorageMgr);
DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
name,
registry,
metaStorageMgr,
logicalTopologyService,
vault,
catalogManager
);
var schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
TableManager tableManager = new TableManager(
name,
registry,
gcConfig,
clusterSvc,
raftMgr,
replicaMgr,
lockManager,
replicaService,
txManager,
dataStorageManager,
storagePath,
metaStorageMgr,
schemaManager,
view -> new LocalLogStorageFactory(),
hybridClock,
new OutgoingSnapshotsManager(clusterSvc.messagingService()),
topologyAwareRaftGroupServiceFactory,
vault,
distributionZoneManager,
schemaSyncService,
catalogManager,
new HybridTimestampTracker(),
placementDriverManager.placementDriver()
);
var indexManager = new IndexManager(schemaManager, tableManager, catalogManager, metaStorageMgr, registry);
var metricManager = new MetricManager();
SqlQueryProcessor qryEngine = new SqlQueryProcessor(
registry,
clusterSvc,
logicalTopologyService,
tableManager,
schemaManager,
dataStorageManager,
() -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
replicaSvc,
hybridClock,
schemaSyncService,
catalogManager,
metricManager,
new SystemViewManagerImpl(name, catalogManager),
placementDriverManager.placementDriver()
);
// Preparing the result map.
components.add(vault);
components.add(nodeCfgMgr);
// Start.
vault.start();
vault.putName(name).join();
nodeCfgMgr.start();
// Start the remaining components.
List<IgniteComponent> otherComponents = List.of(
nettyBootstrapFactory,
clusterSvc,
raftMgr,
clusterStateStorage,
cmgManager,
replicaMgr,
txManager,
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
clockWaiter,
catalogManager,
schemaManager,
distributionZoneManager,
tableManager,
indexManager,
qryEngine
);
for (IgniteComponent component : otherComponents) {
component.start();
components.add(component);
}
PartialNode partialNode = partialNode(
nodeCfgMgr,
clusterCfgMgr,
metaStorageMgr,
revisionCallback,
components,
localConfigurationGenerator,
logicalTopology,
cfgStorage,
distributedConfigurationGenerator,
clusterConfigRegistry,
hybridClock
);
partialNodes.add(partialNode);
return partialNode;
}
/**
* Returns a path to the partitions store directory. Creates a directory if it doesn't exist.
*
* @param workDir Ignite work directory.
* @return Partitions store path.
*/
private static Path getPartitionsStorePath(Path workDir) {
Path partitionsStore = workDir.resolve(Paths.get("db"));
try {
Files.createDirectories(partitionsStore);
} catch (IOException e) {
throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e);
}
return partitionsStore;
}
/**
* Starts a node with the given parameters.
*
* @param idx Node index.
* @param cfg Configuration string or {@code null} to use the default configuration.
* @return Created node instance.
*/
private IgniteImpl startNode(int idx, @Nullable String cfg) {
boolean initNeeded = CLUSTER_NODES_NAMES.isEmpty();
CompletableFuture<Ignite> future = startNodeAsync(idx, cfg);
if (initNeeded) {
String nodeName = CLUSTER_NODES_NAMES.get(0);
InitParameters initParameters = InitParameters.builder()
.destinationNodeName(nodeName)
.metaStorageNodeNames(List.of(nodeName))
.clusterName("cluster")
.build();
TestIgnitionManager.init(initParameters);
}
assertThat(future, willCompleteSuccessfully());
Ignite ignite = future.join();
return (IgniteImpl) ignite;
}
/**
* Starts a node with the given parameters.
*
* @param idx Node index.
* @return Created node instance.
*/
private IgniteImpl startNode(int idx) {
return startNode(idx, null);
}
/**
* Starts a node with the given parameters. Does not run the Init command.
*
* @param idx Node index.
* @param cfg Configuration string or {@code null} to use the default configuration.
* @return Future that completes with a created node instance.
*/
private CompletableFuture<Ignite> startNodeAsync(int idx, @Nullable String cfg) {
String nodeName = testNodeName(testInfo, idx);
String cfgString = cfg == null ? configurationString(idx) : cfg;
if (CLUSTER_NODES_NAMES.size() == idx) {
CLUSTER_NODES_NAMES.add(nodeName);
} else {
assertNull(CLUSTER_NODES_NAMES.get(idx));
CLUSTER_NODES_NAMES.set(idx, nodeName);
}
return TestIgnitionManager.start(nodeName, cfgString, workDir.resolve(nodeName));
}
/**
* Starts an {@code amount} number of nodes (with sequential indices starting from 0).
*/
private List<IgniteImpl> startNodes(int amount) {
boolean initNeeded = CLUSTER_NODES_NAMES.isEmpty();
List<CompletableFuture<Ignite>> futures = IntStream.range(0, amount)
.mapToObj(i -> startNodeAsync(i, null))
.collect(Collectors.toList());
if (initNeeded) {
String nodeName = CLUSTER_NODES_NAMES.get(0);
InitParameters initParameters = InitParameters.builder()
.destinationNodeName(nodeName)
.metaStorageNodeNames(List.of(nodeName))
.clusterName("cluster")
.build();
TestIgnitionManager.init(initParameters);
}
return futures.stream()
.map(future -> {
assertThat(future, willCompleteSuccessfully());
return (IgniteImpl) future.join();
})
.collect(Collectors.toList());
}
/**
* Stop the node with given index.
*
* @param idx Node index.
*/
private void stopNode(int idx) {
String nodeName = CLUSTER_NODES_NAMES.set(idx, null);
if (nodeName != null) {
IgnitionManager.stop(nodeName);
}
}
/**
* Restarts empty node.
*/
@Test
public void emptyNodeTest() {
IgniteImpl ignite = startNode(0);
int nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(DEFAULT_NODE_PORT, nodePort);
stopNode(0);
ignite = startNode(0);
nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(DEFAULT_NODE_PORT, nodePort);
}
/**
* Check correctness of return results after node restart.
* Scenario:
* <ol>
* <li>Start two nodes and fill the data.</li>
* <li>Create index.</li>
* <li>Check explain contain index scan.</li>
* <li>Check return results.</li>
* <li>Restart one node.</li>
* <li>Run query and compare results.</li>
* </ol>
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-19091")
public void testQueryCorrectnessAfterNodeRestart() throws InterruptedException {
IgniteImpl ignite1 = startNode(0);
createTableWithoutData(ignite1, TABLE_NAME, 2, 1);
IgniteImpl ignite2 = startNode(1);
String sql = "SELECT id FROM " + TABLE_NAME + " WHERE id > 0 ORDER BY id";
int intRes;
try (Session session1 = ignite1.sql().createSession(); Session session2 = ignite2.sql().createSession()) {
createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1);
session1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME + "(id)");
waitForIndexToBecomeAvailable(List.of(ignite1, ignite2), "idx1");
ResultSet<SqlRow> plan = session1.execute(null, "EXPLAIN PLAN FOR " + sql);
String planStr = plan.next().stringValue(0);
assertTrue(planStr.contains("IndexScan"));
ResultSet<SqlRow> res1 = session1.execute(null, sql);
ResultSet<SqlRow> res2 = session2.execute(null, sql);
intRes = res1.next().intValue(0);
assertEquals(intRes, res2.next().intValue(0));
res1.close();
res2.close();
}
stopNode(0);
ignite1 = startNode(0);
try (Session session1 = ignite1.sql().createSession()) {
ResultSet<SqlRow> res3 = session1.execute(null, sql);
assertEquals(intRes, res3.next().intValue(0));
}
}
/**
* Restarts a node with changing configuration.
*/
@Test
public void changeConfigurationOnStartTest() {
IgniteImpl ignite = startNode(0);
int nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(DEFAULT_NODE_PORT, nodePort);
stopNode(0);
int newPort = 3322;
String updateCfg = "network.port=" + newPort;
ignite = startNode(0, updateCfg);
nodePort = ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value();
assertEquals(newPort, nodePort);
}
/**
* Tests that a new node's attributes configuration is propagated after node restart.
*/
@Test
public void changeNodeAttributesConfigurationOnStartTest() {
IgniteImpl ignite = startNode(0);
Map<String, String> attributes = new HashMap<>();
NodeAttributesConfiguration attributesConfiguration = ignite.nodeConfiguration().getConfiguration(NodeAttributesConfiguration.KEY);
attributesConfiguration.nodeAttributes().value().namedListKeys().forEach(
key -> attributes.put(key, attributesConfiguration.nodeAttributes().get(key).attribute().value())
);
assertEquals(Collections.emptyMap(), attributes);
stopNode(0);
String newAttributesCfg = "{\n"
+ " region.attribute = \"US\"\n"
+ " storage.attribute = \"SSD\"\n"
+ "}";
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD");
String updateCfg = "nodeAttributes.nodeAttributes=" + newAttributesCfg;
ignite = startNode(0, updateCfg);
NodeAttributesConfiguration newAttributesConfiguration =
ignite.nodeConfiguration().getConfiguration(NodeAttributesConfiguration.KEY);
Map<String, String> newAttributes = new HashMap<>();
newAttributesConfiguration.nodeAttributes().value().namedListKeys().forEach(
key -> newAttributes.put(key, newAttributesConfiguration.nodeAttributes().get(key).attribute().value())
);
assertEquals(newAttributesMap, newAttributes);
}
/**
* Restarts the node which stores some data.
*/
@Test
public void nodeWithDataTest() {
IgniteImpl ignite = startNode(0);
createTableWithData(List.of(ignite), TABLE_NAME, 1);
stopNode(0);
ignite = startNode(0);
checkTableWithData(ignite, TABLE_NAME);
}
/**
* Restarts the node which stores some data.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void metastorageRecoveryTest(boolean useSnapshot) throws InterruptedException {
List<IgniteImpl> nodes = startNodes(2);
IgniteImpl main = nodes.get(0);
createTableWithData(List.of(main), TABLE_NAME, 1);
stopNode(1);
MetaStorageManager metaStorageManager = main.metaStorageManager();
CompletableFuture[] futs = new CompletableFuture[10];
for (int i = 0; i < 10; i++) {
// Put some data to the MetaStorage so that there would be new entries to apply to the restarting node.
ByteArray key = ByteArray.fromString("some-test-key-" + i);
futs[i] = metaStorageManager.put(key, new byte[]{(byte) i});
}
assertThat(CompletableFuture.allOf(futs), willSucceedFast());
if (useSnapshot) {
forceSnapshotUsageOnRestart(main);
}
IgniteImpl second = startNode(1);
checkTableWithData(second, TABLE_NAME);
MetaStorageManager restartedMs = second.metaStorageManager();
for (int i = 0; i < 10; i++) {
ByteArray key = ByteArray.fromString("some-test-key-" + i);
byte[] value = restartedMs.getLocally(key, 100).value();
assertEquals(1, value.length);
assertEquals((byte) i, value[0]);
}
}
private static void forceSnapshotUsageOnRestart(IgniteImpl main) throws InterruptedException {
// Force log truncation, so that restarting node would request a snapshot.
JraftServerImpl server = (JraftServerImpl) main.raftManager().server();
List<Peer> peers = server.localPeers(MetastorageGroupId.INSTANCE);
Peer learnerPeer = peers.stream().filter(peer -> peer.idx() == 0).findFirst().orElseThrow(
() -> new IllegalStateException(String.format("No leader peer"))
);
var nodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, learnerPeer);
RaftGroupService raftGroupService = server.raftGroupService(nodeId);
for (int i = 0; i < 2; i++) {
// Log must be truncated twice.
CountDownLatch snapshotLatch = new CountDownLatch(1);
AtomicReference<Status> snapshotStatus = new AtomicReference<>();
raftGroupService.getRaftNode().snapshot(status -> {
snapshotStatus.set(status);
snapshotLatch.countDown();
});
assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time");
assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get());
}
}
/**
* Restarts the node which stores some data.
*/
@Test
public void nodeWithDataAndIndexRebuildTest() {
IgniteImpl ignite = startNode(0);
int partitions = 20;
createTableWithData(List.of(ignite), TABLE_NAME, 1, partitions);
TableViewInternal table = (TableViewInternal) ignite.tables().table(TABLE_NAME);
InternalTableImpl internalTable = (InternalTableImpl) table.internalTable();
CompletableFuture[] flushFuts = new CompletableFuture[partitions];
for (int i = 0; i < partitions; i++) {
// Flush data on disk, so that we will have a snapshot to read on restart.
flushFuts[i] = internalTable.storage().getMvPartition(i).flush();
}
assertThat(CompletableFuture.allOf(flushFuts), willCompleteSuccessfully());
// Add more data, so that on restart there will be a index rebuilding operation.
try (Session session = ignite.sql().createSession()) {
for (int i = 0; i < 100; i++) {
session.execute(null, "INSERT INTO " + TABLE_NAME + "(id, name) VALUES (?, ?)",
i + 500, VALUE_PRODUCER.apply(i + 500));
}
}
stopNode(0);
ignite = startNode(0);
checkTableWithData(ignite, TABLE_NAME);
table = (TableViewInternal) ignite.tables().table(TABLE_NAME);
// Check data that was added after flush.
for (int i = 0; i < 100; i++) {
Tuple row = table.keyValueView().get(null, Tuple.create().set("id", i + 500));
Objects.requireNonNull(row, "row");
assertEquals(VALUE_PRODUCER.apply(i + 500), row.stringValue("name"));
}
}
/**
* Starts two nodes and checks that the data are storing through restarts. Nodes restart in the same order when they started at first.
*/
@Test
public void testTwoNodesRestartDirect() {
twoNodesRestart(true);
}
/**
* Starts two nodes and checks that the data are storing through restarts. Nodes restart in reverse order when they started at first.
*/
@Test
public void testTwoNodesRestartReverse() {
twoNodesRestart(false);
}
/**
* Starts two nodes and checks that the data are storing through restarts.
*
* @param directOrder When the parameter is true, nodes restart in direct order, otherwise they restart in reverse order.
*/
private void twoNodesRestart(boolean directOrder) {
List<IgniteImpl> nodes = startNodes(2);
createTableWithData(nodes, TABLE_NAME, 2);
createTableWithData(nodes, TABLE_NAME_2, 2);
stopNode(0);
stopNode(1);
Ignite ignite;
if (directOrder) {
startNode(0);
ignite = startNode(1);
} else {
// Since the first node is the CMG leader, the second node can't be started synchronously (it won't be able to join the cluster
// and the future will never resolve).
CompletableFuture<Ignite> future = startNodeAsync(1, null);
startNode(0);
assertThat(future, willCompleteSuccessfully());
ignite = future.join();
}
checkTableWithData(ignite, TABLE_NAME);
checkTableWithData(ignite, TABLE_NAME_2);
}
/**
* Check that the table with given name is present in TableManager.
*
* @param tableManager Table manager.
* @param tableName Table name.
*/
private void assertTablePresent(TableManager tableManager, String tableName) {
Collection<TableImpl> tables = tableManager.latestTables().values();
boolean isPresent = false;
for (TableImpl table : tables) {
if (table.name().equals(tableName)) {
isPresent = true;
break;
}
}
assertTrue(isPresent, "tableName=" + tableName + ", tables=" + tables);
}
/**
* Checks that one node in a cluster of 2 nodes is able to restart and recover a table that was created when this node was absent. Also
* checks that the table created before node stop, is not available when majority if lost.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-20137")
public void testOneNodeRestartWithGap() {
IgniteImpl ignite = startNode(0);
startNode(1);
createTableWithData(List.of(ignite), TABLE_NAME, 2);
stopNode(1);
Table table = ignite.tables().table(TABLE_NAME);
assertNotNull(table);
assertThrows(TransactionException.class, () -> table.keyValueView().get(null, Tuple.create().set("id", 0)));
createTableWithoutData(ignite, TABLE_NAME_2, 1, 1);
IgniteImpl ignite1 = startNode(1);
TableManager tableManager = (TableManager) ignite1.tables();
assertNotNull(tableManager);
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
assertTablePresent(tableManager, TABLE_NAME_2.toUpperCase());
}
/**
* Checks that the table created in cluster of 2 nodes, is recovered on a node after restart of this node.
*/
@Test
public void testRecoveryOnOneNode() {
IgniteImpl ignite = startNode(0);
IgniteImpl node = startNode(1);
createTableWithData(List.of(ignite), TABLE_NAME, 2, 1);
stopNode(1);
node = startNode(1);
TableManager tableManager = (TableManager) node.tables();
assertNotNull(tableManager);
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
}
/**
* Checks that a cluster is able to restart when some changes were made in configuration.
*/
@Test
public void testRestartDiffConfig() {
List<IgniteImpl> ignites = startNodes(2);
createTableWithData(ignites, TABLE_NAME, 2);
createTableWithData(ignites, TABLE_NAME_2, 2);
stopNode(0);
stopNode(1);
startNode(0);
@Language("HOCON") String cfgString = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG,
DEFAULT_NODE_PORT + 11,
"[\"localhost:" + DEFAULT_NODE_PORT + "\"]",
DEFAULT_CLIENT_PORT + 11,
DEFAULT_HTTP_PORT + 11,
DEFAULT_HTTPS_PORT + 11
);
IgniteImpl node1 = startNode(1, cfgString);
TableManager tableManager = (TableManager) node1.tables();
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
}
/**
* The test for node restart when there is a gap between the node local configuration and distributed configuration.
*/
@Test
public void testCfgGapWithoutData() {
List<IgniteImpl> nodes = startNodes(3);
createTableWithData(nodes, TABLE_NAME, nodes.size());
log.info("Stopping the node.");
stopNode(nodes.size() - 1);
nodes.set(nodes.size() - 1, null);
createTableWithData(nodes, TABLE_NAME_2, nodes.size());
createTableWithData(nodes, TABLE_NAME_2 + "0", nodes.size());
log.info("Starting the node.");
IgniteImpl node = startNode(nodes.size() - 1, null);
log.info("After starting the node.");
TableManager tableManager = (TableManager) node.tables();
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
assertTablePresent(tableManager, TABLE_NAME_2.toUpperCase());
}
/**
* The test for node restart when there is a gap between the node local configuration and distributed configuration, and metastorage
* group stops for some time while restarting node is being recovered. The recovery process should continue and eventually succeed after
* metastorage group starts again.
*/
@Test
@Disabled(value = "https://issues.apache.org/jira/browse/IGNITE-18919")
public void testMetastorageStop() {
int cfgGap = 4;
List<IgniteImpl> nodes = startNodes(3);
log.info("Stopping the node.");
stopNode(nodes.size() - 1);
nodes.set(nodes.size() - 1, null);
for (int i = 0; i < cfgGap; i++) {
createTableWithData(nodes, "t" + i, nodes.size(), 1);
}
log.info("Starting the node.");
PartialNode partialNode = startPartialNode(
nodes.size() - 1,
configurationString(nodes.size() - 1),
rev -> {
log.info("Partially started node: applying revision: " + rev);
if (rev == cfgGap / 2) {
log.info("Stopping METASTORAGE");
stopNode(0);
log.info("Starting METASTORAGE");
startNode(0);
log.info("Restarted METASTORAGE");
}
}
);
TableManager tableManager = findComponent(partialNode.startedComponents(), TableManager.class);
for (int i = 0; i < cfgGap; i++) {
assertTablePresent(tableManager, "T" + i);
}
}
/**
* The test for node restart when there is a gap between the node local configuration and distributed configuration.
*/
@Test
public void testCfgGap() {
List<IgniteImpl> nodes = startNodes(4);
createTableWithData(nodes, "t1", nodes.size());
log.info("Stopping the node.");
stopNode(nodes.size() - 1);
nodes.set(nodes.size() - 1, null);
checkTableWithData(nodes.get(0), "t1");
createTableWithData(nodes, "t2", nodes.size());
log.info("Starting the node.");
IgniteImpl newNode = startNode(nodes.size() - 1);
checkTableWithData(nodes.get(0), "t1");
checkTableWithData(nodes.get(0), "t2");
checkTableWithData(newNode, "t1");
checkTableWithData(newNode, "t2");
}
/**
* The test for updating cluster configuration with the default value.
* Check that new nodes will be able to synchronize the local cluster configuration.
*/
@Test
public void updateClusterCfgWithDefaultValue() {
IgniteImpl ignite = startNode(0);
GcConfiguration gcConfiguration = ignite.clusterConfiguration()
.getConfiguration(GcConfiguration.KEY);
int defaultValue = gcConfiguration.batchSize().value();
CompletableFuture<Void> update = gcConfiguration.batchSize().update(defaultValue);
assertThat(update, willCompleteSuccessfully());
stopNode(0);
startNodes(3);
}
/**
* Checks the table exists and validates all data in it.
*
* @param ignite Ignite.
* @param name Table name.
*/
private static void checkTableWithData(Ignite ignite, String name) {
Table table = ignite.tables().table(name);
assertNotNull(table);
for (int i = 0; i < 100; i++) {
int fi = i;
Awaitility.with()
.await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.pollDelay(0, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> {
try {
Tuple row = table.keyValueView().get(null, Tuple.create().set("id", fi));
if (row == null) {
return false;
} else {
assertEquals(VALUE_PRODUCER.apply(fi), row.stringValue("name"));
return true;
}
} catch (TransactionException te) {
// There may be an exception if the primary replica node was stopped. We should wait for new primary to appear.
return false;
}
});
}
}
/**
* Creates a table and load data to it.
*
* @param nodes Ignite nodes.
* @param name Table name.
* @param replicas Replica factor.
*/
private void createTableWithData(List<IgniteImpl> nodes, String name, int replicas) {
createTableWithData(nodes, name, replicas, 2);
}
/**
* Creates a table and load data to it.
*
* @param nodes Ignite nodes.
* @param name Table name.
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
private void createTableWithData(List<IgniteImpl> nodes, String name, int replicas, int partitions) {
try (Session session = nodes.get(0).sql().createSession()) {
session.execute(null,
String.format("CREATE ZONE IF NOT EXISTS ZONE_%s WITH REPLICAS=%d, PARTITIONS=%d", name, replicas, partitions));
session.execute(null, "CREATE TABLE IF NOT EXISTS " + name
+ "(id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='ZONE_" + name.toUpperCase() + "';");
for (int i = 0; i < 100; i++) {
session.execute(null, "INSERT INTO " + name + "(id, name) VALUES (?, ?)",
i, VALUE_PRODUCER.apply(i));
}
}
}
private void waitForIndexToBecomeAvailable(Collection<IgniteImpl> nodes, String indexName) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(nodes.size());
nodes.forEach(node -> node.catalogManager().listen(CatalogEvent.INDEX_AVAILABLE, (event, ex) -> {
if (ex != null) {
return failedFuture(ex);
}
MakeIndexAvailableEventParameters availableEvent = (MakeIndexAvailableEventParameters) event;
CatalogIndexDescriptor index = node.catalogManager().index(availableEvent.indexId(), event.catalogVersion());
assertNotNull(index, "Cannot find an index by ID=" + availableEvent.indexId());
if (index.name().equalsIgnoreCase(indexName)) {
// That's our index.
latch.countDown();
return completedFuture(true);
}
return completedFuture(false);
}));
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
/**
* Creates a table.
*
* @param ignite Ignite.
* @param name Table name.
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
private static Table createTableWithoutData(Ignite ignite, String name, int replicas, int partitions) {
try (Session session = ignite.sql().createSession()) {
session.execute(null,
String.format("CREATE ZONE IF NOT EXISTS ZONE_%s WITH REPLICAS=%d, PARTITIONS=%d", name, replicas, partitions));
session.execute(null, "CREATE TABLE " + name
+ "(id INT PRIMARY KEY, name VARCHAR) WITH PRIMARY_ZONE='ZONE_" + name.toUpperCase() + "';");
}
return ignite.tables().table(name);
}
}