blob: a44ded2b20b1f3c1897c32257e059fa566990fda [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.configuration.storage;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests for the {@link DistributedConfigurationStorage}.
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTest {
@InjectConfiguration
private static RaftConfiguration raftConfiguration;
@InjectConfiguration
private static ClusterManagementConfiguration clusterManagementConfiguration;
@InjectConfiguration
private static NodeAttributesConfiguration nodeAttributes;
@InjectConfiguration
private static StorageConfiguration storageConfiguration;
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
/**
* An emulation of an Ignite node, that only contains components necessary for tests.
*/
private static class Node {
private final VaultManager vaultManager;
private final ClusterService clusterService;
private final ClusterManagementGroupManager cmgManager;
private final Loza raftManager;
private final MetaStorageManager metaStorageManager;
private final DistributedConfigurationStorage cfgStorage;
/** The future have to be complete after the node start and all Meta storage watches are deployd. */
private final CompletableFuture<Void> deployWatchesFut;
/**
* Constructor that simply creates a subset of components of this node.
*/
Node(TestInfo testInfo, Path workDir) {
var addr = new NetworkAddress("localhost", 10000);
vaultManager = new VaultManager(new PersistentVaultService(workDir.resolve("vault")));
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
addr.port(),
new StaticNodeFinder(List.of(addr))
);
HybridClock clock = new HybridClockImpl();
var raftGroupEventsClientListener = new RaftGroupEventsClientListener();
raftManager = new Loza(clusterService, raftConfiguration, workDir, clock, raftGroupEventsClientListener);
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterService,
hocon -> hocon,
new TestConfigurationValidator()
);
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
clusterInitializer,
raftManager,
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes, storageConfiguration)
);
var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterService,
logicalTopologyService,
Loza.FACTORY,
raftGroupEventsClientListener
);
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
logicalTopologyService,
raftManager,
new SimpleInMemoryKeyValueStorage(name()),
clock,
topologyAwareRaftGroupServiceFactory,
metaStorageConfiguration
);
deployWatchesFut = metaStorageManager.deployWatches();
cfgStorage = new DistributedConfigurationStorage("test", metaStorageManager);
}
/**
* Starts the created components.
*/
void start() {
assertThat(
startAsync(vaultManager, clusterService, raftManager, cmgManager, metaStorageManager),
willCompleteSuccessfully()
);
// this is needed to avoid assertion errors
cfgStorage.registerConfigurationListener(changedEntries -> nullCompletedFuture());
}
/**
* Waits for watches deployed.
*/
void waitWatches() {
assertThat("Watches were not deployed", deployWatchesFut, willCompleteSuccessfully());
}
/**
* Stops the created components.
*/
void stop() {
var components =
List.of(metaStorageManager, cmgManager, raftManager, clusterService, vaultManager);
for (IgniteComponent igniteComponent : components) {
igniteComponent.beforeNodeStop();
}
assertThat(stopAsync(components), willCompleteSuccessfully());
}
String name() {
return clusterService.nodeName();
}
}
/**
* Tests a scenario when a node is restarted with an existing PDS folder. A node is started and some data is written to the distributed
* configuration storage. We then expect that the same data can be read by the node after restart.
*
* @see <a href="https://issues.apache.org/jira/browse/IGNITE-15213">IGNITE-15213</a>
*/
@Test
void testRestartWithPds(@WorkDirectory Path workDir, TestInfo testInfo) throws Exception {
var node = new Node(testInfo, workDir);
Map<String, Serializable> data = Map.of("foo", "bar");
try {
node.start();
node.cmgManager.initCluster(List.of(node.name()), List.of(), "cluster");
node.waitWatches();
assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true)));
assertTrue(waitForCondition(
() -> node.metaStorageManager.appliedRevision() != 0,
3000
));
} finally {
node.stop();
}
var node2 = new Node(testInfo, workDir);
try {
node2.start();
node2.waitWatches();
CompletableFuture<Data> storageData = node2.cfgStorage.readDataOnRecovery();
assertThat(storageData.thenApply(Data::values), willBe(equalTo(data)));
} finally {
node2.stop();
}
}
}