blob: fc5632baa92269bec507c4d803665dc8e2747018 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage.client;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl.DelegatingStateMachine;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Persistent (rocksdb-based) meta storage client tests.
*/
@ExtendWith(WorkDirectoryExtension.class)
public class ITMetaStorageServicePersistenceTest {
/** Starting server port. */
private static final int PORT = 5003;
/** Starting client port. */
private static final int CLIENT_PORT = 6003;
/**
* Peers list.
*/
private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
.mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
.map(Peer::new)
.collect(Collectors.toUnmodifiableList());
/** */
private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
/** Factory. */
private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
/** Network factory. */
private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
/** */
private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
/** */
@WorkDirectory
private Path workDir;
/** Cluster. */
private final List<ClusterService> cluster = new ArrayList<>();
/** Servers. */
private final List<JRaftServerImpl> servers = new ArrayList<>();
/** Clients. */
private final List<RaftGroupService> clients = new ArrayList<>();
/**
* Shutdown raft server and stop all cluster nodes.
*
* @throws Exception If failed to shutdown raft server,
*/
@AfterEach
public void afterTest() throws Exception {
for (RaftGroupService client : clients)
client.shutdown();
for (JRaftServerImpl server : servers)
server.stop();
for (ClusterService service : cluster)
service.stop();
}
/**
* Test parameters for {@link #testSnapshot}.
*/
private static class TestData {
/** Delete raft group folder. */
private final boolean deleteFolder;
/** Write to meta storage after a snapshot. */
private final boolean writeAfterSnapshot;
/** */
private TestData(boolean deleteFolder, boolean writeAfterSnapshot) {
this.deleteFolder = deleteFolder;
this.writeAfterSnapshot = writeAfterSnapshot;
}
/** {@inheritDoc} */
@Override public String toString() {
return String.format("deleteFolder=%s, writeAfterSnapshot=%s", deleteFolder, writeAfterSnapshot);
}
}
/**
* @return {@link #testSnapshot} parameters.
*/
private static List<TestData> testSnapshotData() {
return List.of(
new TestData(false, false),
new TestData(true, true),
new TestData(false, true),
new TestData(true, false)
);
}
/**
* Tests that a joining raft node successfully restores a snapshot.
*
* @param testData Test parameters.
* @throws Exception If failed.
*/
@ParameterizedTest
@MethodSource("testSnapshotData")
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15298")
public void testSnapshot(TestData testData) throws Exception {
ByteArray firstKey = ByteArray.fromString("first");
byte[] firstValue = "firstValue".getBytes(StandardCharsets.UTF_8);
// Setup a metastorage raft service
RaftGroupService metaStorageSvc = prepareMetaStorage();
MetaStorageServiceImpl metaStorage = new MetaStorageServiceImpl(metaStorageSvc, null);
// Put some data in the metastorage
metaStorage.put(firstKey, firstValue).get();
// Check that data has been written successfully
check(metaStorage, new EntryImpl(firstKey, firstValue, 1, 1));;
// Select any node that is not the leader of the group
JRaftServerImpl toStop = servers.stream()
.filter(server -> !server.localPeer(METASTORAGE_RAFT_GROUP_NAME).equals(metaStorageSvc.leader()))
.findAny()
.orElseThrow();
// Get the path to that node's raft directory
Path serverDataPath = toStop.getServerDataPath(METASTORAGE_RAFT_GROUP_NAME);
// Get the path to that node's RocksDB key-value storage
Path dbPath = getStorage(toStop).getDbPath();
int stopIdx = servers.indexOf(toStop);
// Remove that node from the list of servers
servers.remove(stopIdx);
// Shutdown that node
toStop.stop();
// Create a raft snapshot of the metastorage service
metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
// Remove the first key from the metastorage
metaStorage.remove(firstKey).get();
// Check that data has been removed
check(metaStorage, new EntryImpl(firstKey, null, 2, 2));
// Put same data again
metaStorage.put(firstKey, firstValue).get();
// Check that it has been written
check(metaStorage, new EntryImpl(firstKey, firstValue, 3, 3));
// Create another raft snapshot
metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
byte[] lastKey = firstKey.bytes();
byte[] lastValue = firstValue;
if (testData.deleteFolder) {
// Delete stopped node's raft directory and key-value storage directory
// to check if snapshot could be restored by the restarted node
IgniteUtils.deleteIfExists(dbPath);
IgniteUtils.deleteIfExists(serverDataPath);
}
if (testData.writeAfterSnapshot) {
// Put new data after the second snapshot to check if after
// the snapshot restore operation restarted node will receive it
ByteArray secondKey = ByteArray.fromString("second");
byte[] secondValue = "secondValue".getBytes(StandardCharsets.UTF_8);
metaStorage.put(secondKey, secondValue).get();
lastKey = secondKey.bytes();
lastValue = secondValue;
}
// Restart the node
JRaftServerImpl restarted = startServer(stopIdx, new RocksDBKeyValueStorage(dbPath));
assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
KeyValueStorage storage = getStorage(restarted);
byte[] finalLastKey = lastKey;
int expectedRevision = testData.writeAfterSnapshot ? 4 : 3;
int expectedUpdateCounter = testData.writeAfterSnapshot ? 4 : 3;
EntryImpl expectedLastEntry = new EntryImpl(new ByteArray(lastKey), lastValue, expectedRevision, expectedUpdateCounter);
// Wait until the snapshot is restored
boolean success = waitForCondition(() -> {
org.apache.ignite.internal.metastorage.server.Entry e = storage.get(finalLastKey);
return e.empty() == expectedLastEntry.empty()
&& e.tombstone() == expectedLastEntry.tombstone()
&& e.revision() == expectedLastEntry.revision()
&& e.updateCounter() == expectedLastEntry.revision()
&& Arrays.equals(e.key(), expectedLastEntry.key().bytes())
&& Arrays.equals(e.value(), expectedLastEntry.value());
}, 3_000);
assertTrue(success);
// Check that the last value has been written successfully
check(metaStorage, expectedLastEntry);
}
/**
* Get the meta store's key-value storage of the jraft server.
*
* @param server Server.
* @return Meta store's key value storage.
*/
private static RocksDBKeyValueStorage getStorage(JRaftServerImpl server) {
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(METASTORAGE_RAFT_GROUP_NAME);
DelegatingStateMachine fsm = (DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
MetaStorageListener listener = (MetaStorageListener) fsm.getListener();
KeyValueStorage storage = listener.getStorage();
return (RocksDBKeyValueStorage) storage;
}
/**
* Check meta storage entry.
*
* @param metaStorage Meta storage service.
* @param expected Expected entry.
* @throws ExecutionException If failed.
* @throws InterruptedException If failed.
*/
private void check(MetaStorageServiceImpl metaStorage, EntryImpl expected)
throws ExecutionException, InterruptedException {
Entry entry = metaStorage.get(expected.key()).get();
assertEquals(expected, entry);
}
/** */
@SuppressWarnings("BusyWait") private static boolean waitForCondition(BooleanSupplier cond, long timeout) {
long stop = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < stop) {
if (cond.getAsBoolean())
return true;
try {
sleep(50);
}
catch (InterruptedException e) {
return false;
}
}
return false;
}
/**
* @param cluster The cluster.
* @param exp Expected count.
* @param timeout The timeout in millis.
* @return {@code True} if topology size is equal to expected.
*/
private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
}
/**
* @return Local address.
*/
private static String getLocalAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
/**
* Creates a cluster service.
*/
private ClusterService clusterService(String name, int port, NetworkAddress otherPeer) {
var nodeFinder = new StaticNodeFinder(List.of(otherPeer));
var network = ClusterServiceTestUtils.clusterService(
name,
port,
nodeFinder,
SERIALIZATION_REGISTRY,
NETWORK_FACTORY
);
network.start();
cluster.add(network);
return network;
}
/**
* Starts a raft server.
*
* @param idx Server index (affects port of the server).
* @param storage KeyValueStorage for the MetaStorage.
* @return Server.
*/
private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService service = clusterService("server" + idx, PORT + idx, addr);
Path jraft = workDir.resolve("jraft" + idx);
JRaftServerImpl server = new JRaftServerImpl(service, jraft) {
@Override public void stop() {
super.stop();
service.stop();
}
};
server.start();
server.startRaftGroup(
METASTORAGE_RAFT_GROUP_NAME,
new MetaStorageListener(storage),
INITIAL_CONF
);
servers.add(server);
return server;
}
/**
* Prepares meta storage by instantiating corresponding raft server with {@link MetaStorageListener} and
* a client.
*
* @return Meta storage raft group service instance.
* @throws Exception If failed.
*/
private RaftGroupService prepareMetaStorage() throws Exception {
for (int i = 0; i < INITIAL_CONF.size(); i++)
startServer(i, new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
return startClient(METASTORAGE_RAFT_GROUP_NAME, new NetworkAddress(getLocalAddress(), PORT));
}
/**
* Starts a client with a specific address.
*
* @throws Exception If failed.
*/
private RaftGroupService startClient(String groupId, NetworkAddress addr) throws Exception {
ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(), addr);
RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
List.of(new Peer(addr)), false, 200).get(3, TimeUnit.SECONDS);
clients.add(client);
return client;
}
}