| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.cluster.management.topology; |
| |
| import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.empty; |
| import static org.hamcrest.Matchers.hasItems; |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.not; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgnitionManager; |
| import org.apache.ignite.internal.ClusterPerTestIntegrationTest; |
| import org.apache.ignite.internal.app.IgniteImpl; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; |
| import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; |
| import org.apache.ignite.internal.network.message.ScaleCubeMessage; |
| import org.apache.ignite.internal.tostring.S; |
| import org.intellij.lang.annotations.Language; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.TestInfo; |
| |
| /** |
| * Integration tests for functionality of logical topology events subscription. |
| */ |
| @SuppressWarnings("resource") |
| class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest { |
| private final BlockingQueue<Event> events = new LinkedBlockingQueue<>(); |
| |
| private static final Map<String, String> NODE_ATTRIBUTES_MAP = Map.of("region", "US", "storage", "SSD"); |
| |
| private static final String[] STORAGE_PROFILES_LIST = {"lru_rocks", "segmented_aipersist"}; |
| |
| @Language("HOCON") |
| private static final String NODE_BOOTSTRAP_CFG_TEMPLATE_WITH_NODE_ATTRIBUTES_AND_STORAGE_PROFILES = "{\n" |
| + " network: {\n" |
| + " port: {},\n" |
| + " nodeFinder.netClusterNodes: [ {} ]\n" |
| + " },\n" |
| + " nodeAttributes.nodeAttributes: {region.attribute = US, storage.attribute = SSD},\n" |
| + " storage.profiles: {lru_rocks.engine = rocksdb, segmented_aipersist.engine = aipersist},\n" |
| + " clientConnector.port: {},\n" |
| + " rest.port: {}\n" |
| + "}"; |
| |
| private final LogicalTopologyEventListener listener = new LogicalTopologyEventListener() { |
| @Override |
| public void onNodeValidated(LogicalNode validatedNode) { |
| events.add(new Event(EventType.VALIDATED, validatedNode, -1)); |
| } |
| |
| @Override |
| public void onNodeInvalidated(LogicalNode invalidatedNode) { |
| events.add(new Event(EventType.INVALIDATED, invalidatedNode, -1)); |
| } |
| |
| @Override |
| public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) { |
| events.add(new Event(EventType.JOINED, joinedNode, newTopology.version())); |
| } |
| |
| @Override |
| public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { |
| events.add(new Event(EventType.LEFT, leftNode, newTopology.version())); |
| } |
| }; |
| |
| @Override |
| protected int initialNodes() { |
| // We don't need any nodes to be started automatically. |
| return 0; |
| } |
| |
| @Override |
| protected String getNodeBootstrapConfigTemplate() { |
| return FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE; |
| } |
| |
| @Test |
| void receivesLogicalTopologyEvents() throws Exception { |
| cluster.startAndInit(1); |
| |
| IgniteImpl entryNode = node(0); |
| |
| entryNode.logicalTopologyService().addEventListener(listener); |
| |
| // Checking that onAppeared() is received. |
| Ignite secondIgnite = startNode(1); |
| |
| Event event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.VALIDATED)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| |
| event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.JOINED)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| assertThat(event.topologyVersion, is(2L)); |
| |
| assertThat(events, is(empty())); |
| |
| // Checking that onDisappeared() is received. |
| stopNode(1); |
| |
| event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.LEFT)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| assertThat(event.topologyVersion, is(3L)); |
| |
| assertThat(events, is(empty())); |
| } |
| |
| @Test |
| void receivesLogicalTopologyEventsWithAttributes() throws Exception { |
| cluster.startAndInit(1); |
| |
| IgniteImpl entryNode = node(0); |
| |
| entryNode.logicalTopologyService().addEventListener(listener); |
| |
| // Checking that onAppeared() is received. |
| Ignite secondIgnite = startNode(1, NODE_BOOTSTRAP_CFG_TEMPLATE_WITH_NODE_ATTRIBUTES_AND_STORAGE_PROFILES); |
| |
| Event event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.VALIDATED)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| assertThat(event.node.userAttributes(), is(NODE_ATTRIBUTES_MAP)); |
| assertThat(event.node.storageProfiles(), hasItems(STORAGE_PROFILES_LIST)); |
| |
| event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.JOINED)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| assertThat(event.topologyVersion, is(2L)); |
| assertThat(event.node.userAttributes(), is(NODE_ATTRIBUTES_MAP)); |
| assertThat(event.node.storageProfiles(), hasItems(STORAGE_PROFILES_LIST)); |
| |
| assertThat(events, is(empty())); |
| |
| // Checking that onDisappeared() is received. |
| stopNode(1); |
| |
| event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.LEFT)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| assertThat(event.topologyVersion, is(3L)); |
| assertThat(event.node.userAttributes(), is(Collections.emptyMap())); |
| assertThat(event.node.storageProfiles(), is(Collections.emptyList())); |
| |
| assertThat(events, is(empty())); |
| } |
| |
| @Test |
| void receiveLogicalTopologyFromLeaderWithAttributes() throws Exception { |
| cluster.startAndInit(1); |
| |
| IgniteImpl entryNode = node(0); |
| |
| IgniteImpl secondIgnite = startNode(1, NODE_BOOTSTRAP_CFG_TEMPLATE_WITH_NODE_ATTRIBUTES_AND_STORAGE_PROFILES); |
| |
| List<LogicalNode> logicalTopologyFromLeader = new ArrayList<>( |
| entryNode.logicalTopologyService().logicalTopologyOnLeader().get(5, TimeUnit.SECONDS).nodes() |
| ); |
| |
| assertEquals(2, logicalTopologyFromLeader.size()); |
| |
| Optional<LogicalNode> secondNode = logicalTopologyFromLeader.stream().filter(n -> n.name().equals(secondIgnite.name())).findFirst(); |
| |
| assertTrue(secondNode.isPresent()); |
| |
| assertThat(secondNode.get().userAttributes(), is(NODE_ATTRIBUTES_MAP)); |
| assertThat(secondNode.get().storageProfiles(), hasItems(STORAGE_PROFILES_LIST)); |
| } |
| |
| @Test |
| void receivesLogicalTopologyEventsCausedByNodeRestart() throws Exception { |
| cluster.startAndInit(1); |
| |
| IgniteImpl entryNode = node(0); |
| |
| Ignite secondIgnite = startNode(1); |
| |
| entryNode.logicalTopologyService().addEventListener(listener); |
| |
| restartNode(1); |
| |
| Event event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.LEFT)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| assertThat(event.topologyVersion, is(3L)); |
| |
| event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.VALIDATED)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| |
| event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.JOINED)); |
| assertThat(event.node.name(), is(secondIgnite.name())); |
| assertThat(event.topologyVersion, is(4L)); |
| |
| assertThat(events, is(empty())); |
| } |
| |
| @Test |
| void nodeReturnedToPhysicalTopologyDoesNotReturnToLogicalTopology() throws Exception { |
| cluster.startAndInit(1); |
| |
| IgniteImpl entryNode = node(0); |
| |
| IgniteImpl secondIgnite = startNode(1); |
| |
| makeSecondNodeDisappearForFirstNode(entryNode, secondIgnite); |
| |
| CountDownLatch secondIgniteAppeared = new CountDownLatch(1); |
| |
| entryNode.logicalTopologyService().addEventListener(new LogicalTopologyEventListener() { |
| @Override |
| public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) { |
| if (joinedNode.name().equals(secondIgnite.name())) { |
| secondIgniteAppeared.countDown(); |
| } |
| } |
| }); |
| |
| entryNode.stopDroppingMessages(); |
| |
| assertFalse(secondIgniteAppeared.await(3, TimeUnit.SECONDS), "Second node returned to logical topology"); |
| } |
| |
| private static void makeSecondNodeDisappearForFirstNode(IgniteImpl firstIgnite, IgniteImpl secondIgnite) throws InterruptedException { |
| CountDownLatch secondIgniteDisappeared = new CountDownLatch(1); |
| |
| firstIgnite.logicalTopologyService().addEventListener(new LogicalTopologyEventListener() { |
| @Override |
| public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { |
| if (leftNode.name().equals(secondIgnite.name())) { |
| secondIgniteDisappeared.countDown(); |
| } |
| } |
| }); |
| |
| firstIgnite.dropMessages((recipientConsistentId, message) -> |
| secondIgnite.node().name().equals(recipientConsistentId) && message instanceof ScaleCubeMessage); |
| |
| assertTrue(secondIgniteDisappeared.await(10, TimeUnit.SECONDS), "Did not see second node leaving in time"); |
| } |
| |
| @Test |
| void nodeLeavesLogicalTopologyImmediatelyAfterBeingLostBySwim() throws Exception { |
| cluster.startAndInit(1); |
| |
| IgniteImpl entryNode = node(0); |
| |
| IgniteImpl secondNode = startNode(1); |
| |
| entryNode.logicalTopologyService().addEventListener(listener); |
| |
| // Knock the node out without allowing it to say good bye. |
| cluster.simulateNetworkPartitionOf(1); |
| |
| Event leaveEvent = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(leaveEvent, is(notNullValue())); |
| assertThat(leaveEvent.node.name(), is(secondNode.name())); |
| assertThat(leaveEvent.eventType, is(EventType.LEFT)); |
| } |
| |
| @Test |
| void nodeThatCouldNotJoinShouldBeInvalidated(TestInfo testInfo) throws Exception { |
| cluster.startAndInit(1); |
| |
| IgniteImpl entryNode = node(0); |
| |
| entryNode.logicalTopologyService().addEventListener(listener); |
| |
| // Disable messaging to the second node as soon as it is validated. This will emulate a situation when a node leaves after |
| // validation, but before joining the cluster. |
| entryNode.logicalTopologyService().addEventListener(new LogicalTopologyEventListener() { |
| @Override |
| public void onNodeValidated(LogicalNode validatedNode) { |
| entryNode.dropMessages((recipientConsistentId, message) -> validatedNode.name().equals(recipientConsistentId)); |
| } |
| }); |
| |
| cluster.startNodeAsync(1); |
| |
| try { |
| Event event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.VALIDATED)); |
| assertThat(event.node.name(), is(not(entryNode.name()))); |
| |
| event = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(event, is(notNullValue())); |
| assertThat(event.eventType, is(EventType.INVALIDATED)); |
| assertThat(event.node.name(), is(not(entryNode.name()))); |
| } finally { |
| // Stop the second node manually, because it couldn't start successfully. |
| IgnitionManager.stop(testNodeName(testInfo, 1)); |
| } |
| } |
| |
| @Test |
| void nodeLeavesLogicalTopologyImmediatelyOnGracefulStop() throws Exception { |
| cluster.startAndInit(1, DISABLED_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE, ignored -> {}); |
| |
| IgniteImpl entryNode = node(0); |
| |
| IgniteImpl secondIgnite = startNode(1); |
| |
| entryNode.logicalTopologyService().addEventListener(listener); |
| |
| stopNode(1); |
| |
| Event leaveEvent = events.poll(10, TimeUnit.SECONDS); |
| |
| assertThat(events, is(empty())); |
| |
| assertThat("Leave event not received in time", leaveEvent, is(notNullValue())); |
| |
| assertThat(leaveEvent.eventType, is(EventType.LEFT)); |
| assertThat(leaveEvent.node.name(), is(secondIgnite.name())); |
| } |
| |
| private static class Event { |
| private final EventType eventType; |
| private final LogicalNode node; |
| private final long topologyVersion; |
| |
| private Event(EventType eventType, LogicalNode node, long topologyVersion) { |
| this.eventType = eventType; |
| this.node = node; |
| this.topologyVersion = topologyVersion; |
| } |
| |
| @Override |
| public String toString() { |
| return S.toString(this); |
| } |
| } |
| |
| private enum EventType { |
| JOINED, LEFT, VALIDATED, INVALIDATED |
| } |
| } |