blob: 9293cd1fd2808e5c6b00a1f0e01ebb4b1cf5b919 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.cluster.management.raft;
import static org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.LongConsumer;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.cluster.management.raft.commands.ClusterNodeMessage;
import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Tests for the {@link CmgRaftGroupListener}.
*/
public class CmgRaftGroupListenerTest extends BaseIgniteAbstractTest {
private final ClusterStateStorage storage = spy(new TestClusterStateStorage());
private final LongConsumer onLogicalTopologyChanged = mock(LongConsumer.class);
private final LogicalTopology logicalTopology = spy(new LogicalTopologyImpl(storage));
private CmgRaftGroupListener listener;
private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
private final ClusterTag clusterTag = clusterTag(msgFactory, "cluster");
private final ClusterState state = msgFactory.clusterState()
.cmgNodes(Set.copyOf(Set.of("foo")))
.metaStorageNodes(Set.copyOf(Set.of("bar")))
.version(IgniteProductVersion.CURRENT_VERSION.toString())
.clusterTag(clusterTag)
.build();
private final ClusterNodeMessage node = msgFactory.clusterNodeMessage().id("foo").name("bar").host("localhost").port(666).build();
@BeforeEach
void setUp() {
assertThat(storage.startAsync(), willCompleteSuccessfully());
listener = new CmgRaftGroupListener(storage, logicalTopology, onLogicalTopologyChanged);
}
@AfterEach
void tearDown() {
assertThat(storage.stopAsync(), willCompleteSuccessfully());
}
/**
* Test that validated nodes get added and removed from the storage.
*/
@Test
void testValidatedNodes() {
listener.onWrite(iterator(msgFactory.initCmgStateCommand().node(node).clusterState(state).build()));
listener.onWrite(iterator(msgFactory.joinRequestCommand().node(node).version(state.version()).clusterTag(clusterTag).build()));
assertThat(listener.storage().getValidatedNodes(), contains(new LogicalNode(node.asClusterNode())));
listener.onWrite(iterator(msgFactory.joinReadyCommand().node(node).build()));
assertThat(listener.storage().getValidatedNodes(), is(empty()));
}
@Test
void successfulJoinReadyExecutesOnLogicalTopologyChanged() {
listener.onWrite(iterator(msgFactory.initCmgStateCommand().node(node).clusterState(state).build()));
JoinRequestCommand joinRequestCommand = msgFactory.joinRequestCommand()
.node(node)
.version(state.version())
.clusterTag(state.clusterTag())
.build();
listener.onWrite(iterator(joinRequestCommand));
listener.onWrite(iterator(msgFactory.joinReadyCommand().node(node).build()));
verify(onLogicalTopologyChanged).accept(anyLong());
}
@Test
void unsuccessfulJoinReadyDoesNotExecuteOnLogicalTopologyChanged() {
listener.onWrite(iterator(msgFactory.joinReadyCommand().node(node).build()));
verify(onLogicalTopologyChanged, never()).accept(anyLong());
}
@Test
void nodesLeaveExecutesOnLogicalTopologyChanged() {
listener.onWrite(iterator(msgFactory.nodesLeaveCommand().nodes(Set.of(node)).build()));
verify(onLogicalTopologyChanged).accept(anyLong());
}
@Test
void restoreFromSnapshotTriggersTopologyLeapEvent() {
doNothing().when(storage).restoreSnapshot(any());
assertTrue(listener.onSnapshotLoad(Paths.get("/unused")));
verify(logicalTopology).fireTopologyLeap();
}
@Test
void absentClusterConfigUpdateErasesClusterConfig() {
ClusterState clusterState = msgFactory.clusterState()
.cmgNodes(Set.copyOf(Set.of("foo")))
.metaStorageNodes(Set.copyOf(Set.of("bar")))
.version(IgniteProductVersion.CURRENT_VERSION.toString())
.clusterTag(clusterTag)
.initialClusterConfiguration("config")
.build();
listener.onWrite(iterator(msgFactory.initCmgStateCommand().node(node).clusterState(clusterState).build()));
Collection<String> cmgNodes = clusterState.cmgNodes();
Collection<String> msNodes = clusterState.metaStorageNodes();
IgniteProductVersion igniteVersion = clusterState.igniteVersion();
ClusterTag clusterTag1 = clusterState.clusterTag();
ClusterState clusterStateToUpdate = msgFactory.clusterState()
.cmgNodes(Set.copyOf(cmgNodes))
.metaStorageNodes(Set.copyOf(msNodes))
.version(igniteVersion.toString())
.clusterTag(clusterTag1)
.build();
listener.onWrite(iterator(msgFactory.updateClusterStateCommand().clusterState(clusterStateToUpdate).build()));
ClusterState updatedClusterState = listener.storage().getClusterState();
assertAll(
() -> assertNull(updatedClusterState.initialClusterConfiguration()),
() -> assertEquals(updatedClusterState.cmgNodes(), clusterState.cmgNodes()),
() -> assertEquals(updatedClusterState.metaStorageNodes(), clusterState.metaStorageNodes()),
() -> assertEquals(updatedClusterState.version(), clusterState.version()),
() -> assertEquals(updatedClusterState.igniteVersion(), clusterState.igniteVersion()),
() -> assertEquals(updatedClusterState.clusterTag(), clusterState.clusterTag())
);
}
private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) {
CommandClosure<T> closure = new CommandClosure<>() {
@Override
public T command() {
return obj;
}
@Override
public void result(@Nullable Serializable res) {
// no-op.
}
};
return List.of(closure).iterator();
}
}