blob: 3e71355315f74288d8cc19761aa8331637dce713 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.network.NodeMetadata;
import org.apache.ignite.network.TopologyService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
/** Tests MetaStorage manager recovery basics. */
@ExtendWith(ConfigurationExtension.class)
public class MetaStorageManagerRecoveryTest extends BaseIgniteAbstractTest {
private static final String NODE_NAME = "node";
private static final String LEADER_NAME = "ms-leader";
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
private MetaStorageManagerImpl metaStorageManager;
private KeyValueStorage kvs;
private HybridClock clock;
private void createMetaStorage(long remoteRevision) throws Exception {
ClusterService clusterService = clusterService();
ClusterManagementGroupManager cmgManager = clusterManagementManager();
LogicalTopologyService topologyService = mock(LogicalTopologyService.class);
RaftManager raftManager = raftManager(remoteRevision);
clock = new HybridClockImpl();
kvs = spy(new SimpleInMemoryKeyValueStorage(NODE_NAME));
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
topologyService,
raftManager,
kvs,
clock,
mock(TopologyAwareRaftGroupServiceFactory.class),
metaStorageConfiguration
);
}
private RaftManager raftManager(long remoteRevision) throws Exception {
RaftManager raft = mock(RaftManager.class);
RaftGroupService service = mock(RaftGroupService.class);
when(service.run(any(GetCurrentRevisionCommand.class)))
.thenAnswer(invocation -> completedFuture(remoteRevision));
when(raft.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), any(), any(), any()))
.thenAnswer(invocation -> completedFuture(service));
return raft;
}
private ClusterService clusterService() {
return new ClusterService() {
@Override
public String nodeName() {
return "node";
}
@Override
public TopologyService topologyService() {
return null;
}
@Override
public MessagingService messagingService() {
return null;
}
@Override
public MessageSerializationRegistry serializationRegistry() {
return null;
}
@Override
public boolean isStopped() {
return false;
}
@Override
public void updateMetadata(NodeMetadata metadata) {
}
@Override
public CompletableFuture<Void> startAsync() {
return nullCompletedFuture();
}
};
}
private ClusterManagementGroupManager clusterManagementManager() {
ClusterManagementGroupManager mock = mock(ClusterManagementGroupManager.class);
when(mock.metaStorageNodes())
.thenAnswer(invocation -> completedFuture(Set.of(LEADER_NAME)));
return mock;
}
@Test
void testRecoverToRevision() throws Exception {
long targetRevision = 10;
createMetaStorage(targetRevision);
assertThat(metaStorageManager.startAsync(), willCompleteSuccessfully());
CompletableFuture<Void> msDeployFut = metaStorageManager.deployWatches();
for (int i = 0; i < targetRevision; i++) {
kvs.put(new byte[0], new byte[0], clock.now());
}
assertThat(msDeployFut, willSucceedFast());
// MetaStorage recovered to targetRevision and started watching targetRevision + 1.
verify(kvs).startWatches(eq(targetRevision + 1), any());
}
@Test
void testRecoverClean() throws Exception {
createMetaStorage(0);
assertThat(metaStorageManager.startAsync(), willCompleteSuccessfully());
CompletableFuture<Void> msDeployFut = metaStorageManager.deployWatches();
assertThat(msDeployFut, willSucceedFast());
// MetaStorage is at revision 0 and started watching revision 1.
verify(kvs).startWatches(eq(1L), any());
}
}