| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal; |
| |
| import java.util.Collection; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| |
| /** |
| * Tests discovery event topology snapshots. |
| */ |
| public class GridDiscoveryEventSelfTest extends GridCommonAbstractTest { |
| /** Daemon flag. */ |
| private boolean daemon; |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| daemon = false; |
| } |
| |
| /** */ |
| private static final IgniteClosure<ClusterNode, UUID> NODE_2ID = new IgniteClosure<ClusterNode, UUID>() { |
| @Override public UUID apply(ClusterNode n) { |
| return n.id(); |
| } |
| |
| @Override public String toString() { |
| return "Grid node shadow to node ID transformer closure."; |
| } |
| }; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration c = super.getConfiguration(igniteInstanceName); |
| |
| c.setDaemon(daemon); |
| c.setConnectorConfiguration(null); |
| |
| return c; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testJoinSequenceEvents() throws Exception { |
| try { |
| Ignite g0 = startGrid(0); |
| |
| UUID id0 = g0.cluster().localNode().id(); |
| |
| final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); |
| |
| final CountDownLatch latch = new CountDownLatch(3); |
| |
| g0.events().localListen(new IgnitePredicate<Event>() { |
| private AtomicInteger cnt = new AtomicInteger(); |
| |
| @Override public boolean apply(Event evt) { |
| assert evt.type() == EVT_NODE_JOINED : evt; |
| |
| evts.put(cnt.getAndIncrement(), ((DiscoveryEvent)evt).topologyNodes()); |
| |
| latch.countDown(); |
| |
| return true; |
| } |
| }, EVT_NODE_JOINED); |
| |
| UUID id1 = startGrid(1).cluster().localNode().id(); |
| UUID id2 = startGrid(2).cluster().localNode().id(); |
| UUID id3 = startGrid(3).cluster().localNode().id(); |
| |
| assertTrue("Wrong count of events received: " + evts, latch.await(3000, MILLISECONDS)); |
| |
| Collection<ClusterNode> top0 = evts.get(0); |
| |
| assertNotNull(top0); |
| assertEquals(2, top0.size()); |
| assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1)); |
| |
| Collection<ClusterNode> top1 = evts.get(1); |
| |
| assertNotNull(top1); |
| assertEquals(3, top1.size()); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2)); |
| |
| Collection<ClusterNode> top2 = evts.get(2); |
| |
| assertNotNull(top2); |
| assertEquals(4, top2.size()); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3)); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLeaveSequenceEvents() throws Exception { |
| try { |
| Ignite g0 = startGrid(0); |
| |
| UUID id0 = g0.cluster().localNode().id(); |
| UUID id1 = startGrid(1).cluster().localNode().id(); |
| UUID id2 = startGrid(2).cluster().localNode().id(); |
| UUID id3 = startGrid(3).cluster().localNode().id(); |
| |
| final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); |
| |
| final CountDownLatch latch = new CountDownLatch(3); |
| |
| g0.events().localListen(new IgnitePredicate<Event>() { |
| private AtomicInteger cnt = new AtomicInteger(); |
| |
| @Override public boolean apply(Event evt) { |
| assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED : evt; |
| |
| evts.put(cnt.getAndIncrement(), ((DiscoveryEvent)evt).topologyNodes()); |
| |
| latch.countDown(); |
| |
| return true; |
| } |
| }, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| stopGrid(3); |
| stopGrid(2); |
| stopGrid(1); |
| |
| assertTrue("Wrong count of events received: " + evts, latch.await(3000, MILLISECONDS)); |
| |
| Collection<ClusterNode> top2 = evts.get(0); |
| |
| assertNotNull(top2); |
| assertEquals(3, top2.size()); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top2, NODE_2ID).contains(id3)); |
| |
| Collection<ClusterNode> top1 = evts.get(1); |
| |
| assertNotNull(top1); |
| assertEquals(2, top1.size()); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); |
| assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id3)); |
| |
| Collection<ClusterNode> top0 = evts.get(2); |
| |
| assertNotNull(top0); |
| assertEquals(1, top0.size()); |
| assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); |
| assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id1)); |
| assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id3)); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMixedSequenceEvents() throws Exception { |
| try { |
| Ignite g0 = startGrid(0); |
| |
| UUID id0 = g0.cluster().localNode().id(); |
| |
| final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); |
| |
| final CountDownLatch latch = new CountDownLatch(8); |
| |
| g0.events().localListen(new IgnitePredicate<Event>() { |
| private AtomicInteger cnt = new AtomicInteger(); |
| |
| @Override public boolean apply(Event evt) { |
| assert evt.type() == EVT_NODE_JOINED |
| || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED : evt; |
| |
| evts.put(cnt.getAndIncrement(), ((DiscoveryEvent)evt).topologyNodes()); |
| |
| latch.countDown(); |
| |
| return true; |
| } |
| }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| UUID id1 = startGrid(1).cluster().localNode().id(); |
| UUID id2 = startGrid(2).cluster().localNode().id(); |
| UUID id3 = startGrid(3).cluster().localNode().id(); |
| |
| stopGrid(3); |
| stopGrid(2); |
| stopGrid(1); |
| |
| UUID id4 = startGrid(4).cluster().localNode().id(); |
| |
| stopGrid(4); |
| |
| assertTrue("Wrong count of events received [cnt= " + evts.size() + ", evts=" + evts + ']', |
| latch.await(3000, MILLISECONDS)); |
| |
| Collection<ClusterNode> top0 = evts.get(0); |
| |
| assertNotNull(top0); |
| assertEquals(2, top0.size()); |
| assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1)); |
| |
| Collection<ClusterNode> top1 = evts.get(1); |
| |
| assertNotNull(top1); |
| assertEquals(3, top1.size()); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1)); |
| assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2)); |
| |
| Collection<ClusterNode> top2 = evts.get(2); |
| |
| assertNotNull(top2); |
| assertEquals(4, top2.size()); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2)); |
| assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3)); |
| |
| Collection<ClusterNode> top3 = evts.get(3); |
| |
| assertNotNull(top3); |
| assertEquals(3, top3.size()); |
| assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id1)); |
| assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top3, NODE_2ID).contains(id3)); |
| |
| Collection<ClusterNode> top4 = evts.get(4); |
| |
| assertNotNull(top4); |
| assertEquals(2, top4.size()); |
| assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id1)); |
| assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id3)); |
| |
| Collection<ClusterNode> top5 = evts.get(5); |
| |
| assertNotNull(top5); |
| assertEquals(1, top5.size()); |
| assertTrue(F.viewReadOnly(top5, NODE_2ID).contains(id0)); |
| assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id1)); |
| assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id3)); |
| |
| Collection<ClusterNode> top6 = evts.get(6); |
| |
| assertNotNull(top6); |
| assertEquals(2, top6.size()); |
| assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id0)); |
| assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id4)); |
| assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id1)); |
| assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id3)); |
| |
| Collection<ClusterNode> top7 = evts.get(7); |
| |
| assertNotNull(top7); |
| assertEquals(1, top7.size()); |
| assertTrue(F.viewReadOnly(top7, NODE_2ID).contains(id0)); |
| assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id1)); |
| assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id2)); |
| assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id3)); |
| assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id4)); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testConcurrentJoinEvents() throws Exception { |
| try { |
| Ignite g0 = startGrid(0); |
| |
| UUID id0 = g0.cluster().localNode().id(); |
| |
| final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>(); |
| |
| g0.events().localListen(new IgnitePredicate<Event>() { |
| private AtomicInteger cnt = new AtomicInteger(); |
| |
| @Override public boolean apply(Event evt) { |
| assert evt.type() == EVT_NODE_JOINED : evt; |
| |
| X.println(">>>>>>> Joined " + F.viewReadOnly(((DiscoveryEvent)evt).topologyNodes(), |
| NODE_2ID)); |
| |
| evts.put(cnt.getAndIncrement(), ((DiscoveryEvent)evt).topologyNodes()); |
| |
| return true; |
| } |
| }, EVT_NODE_JOINED); |
| |
| U.sleep(100); |
| |
| startGridsMultiThreaded(1, 10); |
| |
| U.sleep(100); |
| |
| assertEquals(10, evts.size()); |
| |
| for (int i = 0; i < 10; i++) { |
| Collection<ClusterNode> snapshot = evts.get(i); |
| |
| assertEquals(2 + i, snapshot.size()); |
| assertTrue(F.viewReadOnly(snapshot, NODE_2ID).contains(id0)); |
| |
| for (ClusterNode n : snapshot) |
| assertTrue("Wrong node order in snapshot [i=" + i + ", node=" + n + ']', n.order() <= 2 + i); |
| } |
| |
| Collection<UUID> ids = F.viewReadOnly(evts.get(9), NODE_2ID); |
| |
| for (int i = 1; i <= 10; i++) |
| assertTrue(ids.contains(grid(i).localNode().id())); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDaemonNodeJoin() throws Exception { |
| try { |
| startGridsMultiThreaded(3); |
| |
| final AtomicReference<IgniteCheckedException> err = new AtomicReference<>(); |
| |
| for (int i = 0; i < 3; i++) { |
| Ignite g = grid(i); |
| |
| g.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| DiscoveryEvent discoEvt = (DiscoveryEvent)evt; |
| |
| if (discoEvt.topologyNodes().size() != 3) |
| err.compareAndSet(null, new IgniteCheckedException("Invalid discovery event [evt=" + discoEvt + |
| ", nodes=" + discoEvt.topologyNodes() + ']')); |
| |
| return true; |
| } |
| }, EventType.EVT_NODE_JOINED); |
| } |
| |
| daemon = true; |
| |
| IgniteKernal daemon = (IgniteKernal)startGrid(3); |
| |
| DiscoveryEvent join = daemon.context().discovery().localJoinEvent(); |
| |
| assertEquals(3, join.topologyNodes().size()); |
| |
| U.sleep(100); |
| |
| if (err.get() != null) |
| throw err.get(); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| } |