blob: 7ed26955ddd0fe8edbd570fe61f9f85457465d88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.storage.Data;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests for the {@link DistributedConfigurationStorage}.
*/
@ExtendWith(WorkDirectoryExtension.class)
public class ITDistributedConfigurationStorageTest {
/**
* An emulation of an Ignite node, that only contains components necessary for tests.
*/
private static class Node {
/** */
private final NetworkAddress addr = new NetworkAddress("localhost", 10000);
/** */
private final VaultManager vaultManager;
/** */
private final ClusterService clusterService;
/** */
private final Loza raftManager;
/** */
private final ConfigurationManager cfgManager;
/** */
private final MetaStorageManager metaStorageManager;
/** */
private final DistributedConfigurationStorage cfgStorage;
/**
* Constructor that simply creates a subset of components of this node.
*/
Node(Path workDir) {
vaultManager = new VaultManager(new PersistentVaultService(workDir.resolve("vault")));
clusterService = ClusterServiceTestUtils.clusterService(
addr.toString(),
addr.port(),
new StaticNodeFinder(List.of(addr)),
new MessageSerializationRegistryImpl(),
new TestScaleCubeClusterServiceFactory()
);
raftManager = new Loza(clusterService, workDir);
List<RootKey<?, ?>> rootKeys = List.of(NodeConfiguration.KEY);
cfgManager = new ConfigurationManager(
rootKeys,
Map.of(),
new LocalConfigurationStorage(vaultManager),
List.of()
);
metaStorageManager = new MetaStorageManager(
vaultManager,
cfgManager,
clusterService,
raftManager
);
cfgStorage = new DistributedConfigurationStorage(metaStorageManager, vaultManager);
}
/**
* Starts the created components.
*/
void start() throws Exception {
vaultManager.start();
cfgManager.start();
// metastorage configuration
var config = String.format("{\"node\": {\"metastorageNodes\": [ \"%s\" ]}}", addr);
cfgManager.bootstrap(config);
Stream.of(clusterService, raftManager, metaStorageManager).forEach(IgniteComponent::start);
// this is needed to avoid assertion errors
cfgStorage.registerConfigurationListener(changedEntries -> completedFuture(null));
// deploy watches to propagate data from the metastore into the vault
metaStorageManager.deployWatches();
}
/**
* Stops the created components.
*/
void stop() throws Exception {
var components =
List.of(metaStorageManager, raftManager, clusterService, cfgManager, vaultManager);
for (IgniteComponent igniteComponent : components)
igniteComponent.beforeNodeStop();
for (IgniteComponent component : components)
component.stop();
}
}
/**
* Tests a scenario when a node is restarted with an existing PDS folder. A node is started and some data is written
* to the distributed configuration storage. We then expect that the same data can be read by the node after
* restart.
*
* @see <a href="https://issues.apache.org/jira/browse/IGNITE-15213">IGNITE-15213</a>
*/
@Test
void testRestartWithPds(@WorkDirectory Path workDir) throws Exception {
var node = new Node(workDir);
Map<String, Serializable> data = Map.of("foo", "bar");
try {
node.start();
assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true)));
waitForCondition(() -> Objects.nonNull(node.vaultManager.get(APPLIED_REV).join().value()), 3000);
}
finally {
node.stop();
}
var node2 = new Node(workDir);
try {
node2.start();
Data storageData = node2.cfgStorage.readAll();
assertThat(storageData.values(), equalTo(data));
}
finally {
node2.stop();
}
}
}