| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.messaging; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.Serializable; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentLinkedDeque; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteMessaging; |
| import org.apache.ignite.cluster.ClusterGroup; |
| import org.apache.ignite.internal.DiscoverySpiTestListener; |
| import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; |
| import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; |
| import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2; |
| import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.typedef.P2; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.config.GridTestProperties; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.testframework.GridTestUtils.assertThrows; |
| |
| /** |
| * Various tests for Messaging public API. |
| */ |
| public class GridMessagingSelfTest extends GridCommonAbstractTest implements Serializable { |
| /** */ |
| private static final String MSG_1 = "MSG-1"; |
| |
| /** */ |
| private static final String MSG_2 = "MSG-2"; |
| |
| /** */ |
| private static final String MSG_3 = "MSG-3"; |
| |
| /** */ |
| private static final String S_TOPIC_1 = "TOPIC-1"; |
| |
| /** */ |
| private static final String S_TOPIC_2 = "TOPIC-2"; |
| |
| /** */ |
| private static final Integer I_TOPIC_1 = 1; |
| |
| /** */ |
| private static final Integer I_TOPIC_2 = 2; |
| |
| /** Message count. */ |
| private static AtomicInteger MSG_CNT; |
| |
| /** */ |
| public static final String EXT_RESOURCE_CLS_NAME = "org.apache.ignite.tests.p2p.TestUserResource"; |
| |
| /** */ |
| protected static CountDownLatch rcvLatch; |
| |
| /** |
| * A test message topic. |
| */ |
| private enum TestTopic { |
| /** */ |
| TOPIC_1, |
| |
| /** */ |
| TOPIC_2 |
| } |
| |
| /** |
| * A test message with a hack for delay |
| * emulation. |
| */ |
| private static class TestMessage implements Externalizable { |
| /** */ |
| private Object body; |
| |
| /** */ |
| private long delayMs; |
| |
| /** |
| * No-arg constructor for {@link Externalizable}. |
| */ |
| public TestMessage() { |
| // No-op. |
| } |
| |
| /** |
| * @param body Message body. |
| */ |
| TestMessage(Object body) { |
| this.body = body; |
| } |
| |
| /** |
| * @param body Message body. |
| * @param delayMs Message send delay in milliseconds. |
| */ |
| TestMessage(Object body, long delayMs) { |
| this.body = body; |
| this.delayMs = delayMs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return "TestMessage [body=" + body + "]"; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return body.hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object obj) { |
| return obj instanceof TestMessage && body.equals(((TestMessage)obj).body); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| if (delayMs > 0) { |
| try { |
| Thread.sleep(delayMs); |
| } |
| catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| out.writeObject(body); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| body = in.readObject(); |
| } |
| } |
| |
| /** */ |
| protected Ignite ignite1; |
| |
| /** */ |
| protected Ignite ignite2; |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| MSG_CNT = new AtomicInteger(); |
| |
| ignite1 = startGrid(1); |
| ignite2 = startGrid(2); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| |
| ignite1 = null; |
| ignite2 = null; |
| } |
| |
| /** |
| * Tests simple message sending-receiving. |
| * |
| * @throws Exception If error occurs. |
| */ |
| @Test |
| public void testSendReceiveMessage() throws Exception { |
| final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); |
| |
| final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable |
| |
| final CountDownLatch rcvLatch = new CountDownLatch(3); |
| |
| ignite1.message().localListen(null, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| if (!nodeId.equals(ignite2.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ClusterGroup rNode1 = ignite2.cluster().forRemotes(); |
| |
| message(rNode1).send(null, MSG_1); |
| message(rNode1).send(null, MSG_2); |
| message(rNode1).send(null, MSG_3); |
| |
| assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); |
| |
| assertFalse(error.get()); |
| |
| assertTrue(rcvMsgs.contains(MSG_1)); |
| assertTrue(rcvMsgs.contains(MSG_2)); |
| assertTrue(rcvMsgs.contains(MSG_3)); |
| } |
| |
| /** |
| * @throws Exception If error occurs. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| @Test |
| public void testStopLocalListen() throws Exception { |
| final AtomicInteger msgCnt1 = new AtomicInteger(); |
| |
| final AtomicInteger msgCnt2 = new AtomicInteger(); |
| |
| final AtomicInteger msgCnt3 = new AtomicInteger(); |
| |
| P2<UUID, Object> lsnr1 = new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| log.info("Listener1 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| msgCnt1.incrementAndGet(); |
| |
| return true; |
| } |
| }; |
| |
| P2<UUID, Object> lsnr2 = new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| log.info("Listener2 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| msgCnt2.incrementAndGet(); |
| |
| return true; |
| } |
| }; |
| |
| P2<UUID, Object> lsnr3 = new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| log.info("Listener3 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| msgCnt3.incrementAndGet(); |
| |
| return true; |
| } |
| }; |
| |
| final String topic1 = null; |
| final String topic2 = "top1"; |
| final String topic3 = "top3"; |
| |
| ignite1.message().localListen(topic1, lsnr1); |
| ignite1.message().localListen(topic2, lsnr2); |
| ignite1.message().localListen(topic3, lsnr3); |
| |
| ClusterGroup rNode1 = ignite2.cluster().forRemotes(); |
| |
| message(rNode1).send(topic1, "msg1-1"); |
| message(rNode1).send(topic2, "msg1-2"); |
| message(rNode1).send(topic3, "msg1-3"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return msgCnt1.get() > 0 && msgCnt2.get() > 0 && msgCnt3.get() > 0; |
| } |
| }, 5000); |
| |
| assertEquals(1, msgCnt1.get()); |
| assertEquals(1, msgCnt2.get()); |
| assertEquals(1, msgCnt3.get()); |
| |
| ignite1.message().stopLocalListen(topic2, lsnr2); |
| |
| message(rNode1).send(topic1, "msg2-1"); |
| message(rNode1).send(topic2, "msg2-2"); |
| message(rNode1).send(topic3, "msg2-3"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return msgCnt1.get() > 1 && msgCnt3.get() > 1; |
| } |
| }, 5000); |
| |
| assertEquals(2, msgCnt1.get()); |
| assertEquals(1, msgCnt2.get()); |
| assertEquals(2, msgCnt3.get()); |
| |
| ignite1.message().stopLocalListen(topic2, lsnr1); // Try to use wrong topic for lsnr1 removing. |
| |
| message(rNode1).send(topic1, "msg3-1"); |
| message(rNode1).send(topic2, "msg3-2"); |
| message(rNode1).send(topic3, "msg3-3"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return msgCnt1.get() > 2 && msgCnt3.get() > 2; |
| } |
| }, 5000); |
| |
| assertEquals(3, msgCnt1.get()); |
| assertEquals(1, msgCnt2.get()); |
| assertEquals(3, msgCnt3.get()); |
| |
| ignite1.message().stopLocalListen(topic1, lsnr1); |
| ignite1.message().stopLocalListen(topic3, lsnr3); |
| |
| message(rNode1).send(topic1, "msg4-1"); |
| message(rNode1).send(topic2, "msg4-2"); |
| message(rNode1).send(topic3, "msg4-3"); |
| |
| U.sleep(1000); |
| |
| assertEquals(3, msgCnt1.get()); |
| assertEquals(1, msgCnt2.get()); |
| assertEquals(3, msgCnt3.get()); |
| } |
| |
| /** |
| * Tests simple message sending-receiving with string topic. |
| * |
| * @throws Exception If error occurs. |
| */ |
| @Test |
| public void testSendReceiveMessageWithStringTopic() throws Exception { |
| final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); |
| |
| final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable |
| |
| final CountDownLatch rcvLatch = new CountDownLatch(3); |
| |
| ignite1.message().localListen(S_TOPIC_1, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=" + S_TOPIC_1 + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_1.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: " + S_TOPIC_1); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ignite1.message().localListen(S_TOPIC_2, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=" + S_TOPIC_2 + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_2.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: " + S_TOPIC_2); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ignite1.message().localListen(null, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=default]"); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_3.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: default"); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ClusterGroup rNode1 = ignite1.cluster().forLocal(); |
| |
| message(rNode1).send(S_TOPIC_1, MSG_1); |
| message(rNode1).send(S_TOPIC_2, MSG_2); |
| message(rNode1).send(null, MSG_3); |
| |
| assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); |
| |
| assertFalse(error.get()); |
| |
| assertTrue(rcvMsgs.contains(MSG_1)); |
| assertTrue(rcvMsgs.contains(MSG_2)); |
| assertTrue(rcvMsgs.contains(MSG_3)); |
| } |
| |
| /** |
| * Tests simple message sending-receiving with enumerated topic. |
| * |
| * @throws Exception If error occurs. |
| */ |
| @Test |
| public void testSendReceiveMessageWithEnumTopic() throws Exception { |
| final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); |
| |
| final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable |
| |
| final CountDownLatch rcvLatch = new CountDownLatch(3); |
| |
| ignite1.message().localListen(TestTopic.TOPIC_1, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=" + TestTopic.TOPIC_1 + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_1.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: " + TestTopic.TOPIC_1); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ignite1.message().localListen(TestTopic.TOPIC_2, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=" + TestTopic.TOPIC_2 + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_2.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: " + TestTopic.TOPIC_2); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ignite1.message().localListen(null, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=default]"); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_3.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: default"); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ClusterGroup rNode1 = ignite1.cluster().forLocal(); |
| |
| message(rNode1).send(TestTopic.TOPIC_1, MSG_1); |
| message(rNode1).send(TestTopic.TOPIC_2, MSG_2); |
| message(rNode1).send(null, MSG_3); |
| |
| assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); |
| |
| assertFalse(error.get()); |
| |
| assertTrue(rcvMsgs.contains(MSG_1)); |
| assertTrue(rcvMsgs.contains(MSG_2)); |
| assertTrue(rcvMsgs.contains(MSG_3)); |
| } |
| |
| /** |
| * Tests simple message sending-receiving with the use of |
| * remoteListen() method. |
| * |
| * @throws Exception If error occurs. |
| */ |
| @Test |
| public void testRemoteListen() throws Exception { |
| final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); |
| |
| rcvLatch = new CountDownLatch(4); |
| |
| ignite2.message().remoteListen(null, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2. |
| |
| message(prj2).send(null, MSG_1); |
| message(prj2).send(null, MSG_2); |
| message(ignite2.cluster().forLocal()).send(null, MSG_3); |
| |
| assertFalse(rcvLatch.await(3, TimeUnit.SECONDS)); // We should get only 3 message. |
| |
| assertTrue(rcvMsgs.contains(MSG_1)); |
| assertTrue(rcvMsgs.contains(MSG_2)); |
| assertTrue(rcvMsgs.contains(MSG_3)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testStopRemoteListen() throws Exception { |
| final AtomicInteger msgCnt1 = new AtomicInteger(); |
| |
| final AtomicInteger msgCnt2 = new AtomicInteger(); |
| |
| final AtomicInteger msgCnt3 = new AtomicInteger(); |
| |
| final String topic1 = null; |
| final String topic2 = "top2"; |
| final String topic3 = "top3"; |
| |
| UUID id1 = ignite2.message().remoteListen(topic1, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| System.out.println( |
| Thread.currentThread().getName() + " Listener1 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']' |
| ); |
| |
| msgCnt1.incrementAndGet(); |
| |
| return true; |
| } |
| }); |
| |
| UUID id2 = ignite2.message().remoteListen(topic2, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| System.out.println( |
| Thread.currentThread().getName() + " Listener2 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']' |
| ); |
| |
| msgCnt2.incrementAndGet(); |
| |
| return true; |
| } |
| }); |
| |
| UUID id3 = ignite2.message().remoteListen(topic3, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| System.out.println( |
| Thread.currentThread().getName() + " Listener3 received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']' |
| ); |
| |
| msgCnt3.incrementAndGet(); |
| |
| return true; |
| } |
| }); |
| |
| message(ignite1.cluster().forRemotes()).send(topic1, "msg1-1"); |
| message(ignite1.cluster().forRemotes()).send(topic2, "msg1-2"); |
| message(ignite1.cluster().forRemotes()).send(topic3, "msg1-3"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return msgCnt1.get() > 0 && msgCnt2.get() > 0 && msgCnt3.get() > 0; |
| } |
| }, 5000); |
| |
| assertEquals(1, msgCnt1.get()); |
| assertEquals(1, msgCnt2.get()); |
| assertEquals(1, msgCnt3.get()); |
| |
| ignite2.message().stopRemoteListen(id2); |
| |
| message(ignite1.cluster().forRemotes()).send(topic1, "msg2-1"); |
| message(ignite1.cluster().forRemotes()).send(topic2, "msg2-2"); |
| message(ignite1.cluster().forRemotes()).send(topic3, "msg2-3"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return msgCnt1.get() > 1 && msgCnt3.get() > 1; |
| } |
| }, 5000); |
| |
| assertEquals(2, msgCnt1.get()); |
| assertEquals(1, msgCnt2.get()); |
| assertEquals(2, msgCnt3.get()); |
| |
| ignite2.message().stopRemoteListen(id2); // Try remove one more time. |
| |
| ignite2.message().stopRemoteListen(id1); |
| ignite2.message().stopRemoteListen(id3); |
| |
| message(ignite1.cluster().forRemotes()).send(topic1, "msg3-1"); |
| message(ignite1.cluster().forRemotes()).send(topic2, "msg3-2"); |
| message(ignite1.cluster().forRemotes()).send(topic3, "msg3-3"); |
| |
| U.sleep(1000); |
| |
| assertEquals(2, msgCnt1.get()); |
| assertEquals(1, msgCnt2.get()); |
| assertEquals(2, msgCnt3.get()); |
| } |
| |
| /** |
| * Tests simple message sending-receiving with the use of |
| * remoteListen() method. |
| * |
| * @throws Exception If error occurs. |
| */ |
| @Test |
| public void testRemoteListenOrderedMessages() throws Exception { |
| List<TestMessage> msgs = Arrays.asList( |
| new TestMessage(MSG_1), |
| new TestMessage(MSG_2, 3000), |
| new TestMessage(MSG_3)); |
| |
| final Collection<Object> rcvMsgs = new ConcurrentLinkedDeque<>(); |
| |
| final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable |
| |
| rcvLatch = new CountDownLatch(3); |
| |
| ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2. |
| |
| for (TestMessage msg : msgs) |
| message(prj2).sendOrdered(S_TOPIC_1, msg, 15000); |
| |
| assertTrue(rcvLatch.await(6, TimeUnit.SECONDS)); |
| |
| assertFalse(error.get()); |
| |
| assertEquals(msgs, Arrays.asList(rcvMsgs.toArray())); |
| } |
| |
| /** |
| * Tests simple message sending-receiving with the use of |
| * remoteListen() method and topics. |
| * |
| * @throws Exception If error occurs. |
| */ |
| @Test |
| public void testRemoteListenWithIntTopic() throws Exception { |
| final Collection<Object> rcvMsgs = new GridConcurrentHashSet<>(); |
| |
| final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable |
| |
| rcvLatch = new CountDownLatch(3); |
| |
| ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() { |
| @IgniteInstanceResource |
| private transient Ignite g; |
| |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| assertEquals(ignite2, g); |
| |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=" + I_TOPIC_1 + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_1.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_1); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ignite2.message().remoteListen(I_TOPIC_2, new P2<UUID, Object>() { |
| @IgniteInstanceResource |
| private transient Ignite g; |
| |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| assertEquals(ignite2, g); |
| |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=" + I_TOPIC_2 + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_2.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: " + I_TOPIC_2); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ignite2.message().remoteListen(null, new P2<UUID, Object>() { |
| @IgniteInstanceResource |
| private transient Ignite g; |
| |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| assertEquals(ignite2, g); |
| |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + |
| ", topic=default]"); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| if (!MSG_3.equals(msg)) { |
| log.error("Unexpected message " + msg + " for topic: default"); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| rcvMsgs.add(msg); |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| ClusterGroup prj2 = ignite1.cluster().forRemotes(); // Includes node from grid2. |
| |
| message(prj2).send(I_TOPIC_1, MSG_1); |
| message(prj2).send(I_TOPIC_2, MSG_2); |
| message(prj2).send(null, MSG_3); |
| |
| assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); |
| |
| assertFalse(error.get()); |
| |
| assertTrue(rcvMsgs.contains(MSG_1)); |
| assertTrue(rcvMsgs.contains(MSG_2)); |
| assertTrue(rcvMsgs.contains(MSG_3)); |
| } |
| |
| /** |
| * Checks, if it is OK to send the message, loaded with external |
| * class loader. |
| * |
| * @throws Exception If error occurs. |
| */ |
| @Test |
| public void testSendMessageWithExternalClassLoader() throws Exception { |
| URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; |
| |
| ClassLoader extLdr = new URLClassLoader(urls); |
| |
| Class rcCls = extLdr.loadClass(EXT_RESOURCE_CLS_NAME); |
| |
| final AtomicBoolean error = new AtomicBoolean(false); //to make it modifiable |
| |
| final CountDownLatch rcvLatch = new CountDownLatch(1); |
| |
| ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| try { |
| log.info("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| if (!nodeId.equals(ignite1.cluster().localNode().id())) { |
| log.error("Unexpected sender node: " + nodeId); |
| |
| error.set(true); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| finally { |
| rcvLatch.countDown(); |
| } |
| } |
| }); |
| |
| message(ignite1.cluster().forRemotes()).send(S_TOPIC_1, Collections.singleton(rcCls.newInstance())); |
| |
| assertTrue(rcvLatch.await(3, TimeUnit.SECONDS)); |
| |
| assertFalse(error.get()); |
| } |
| |
| /** |
| * Test case for {@code null} messages. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testNullMessages() throws Exception { |
| assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| ignite1.message().send(null, null); |
| |
| return null; |
| } |
| }, IllegalArgumentException.class, "Ouch! Argument is invalid: msgs cannot be null or empty"); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| ignite1.message().send(null, Collections.emptyList()); |
| |
| return null; |
| } |
| }, IllegalArgumentException.class, "Ouch! Argument is invalid: msgs cannot be null or empty"); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| ignite1.message().send(null, (Object)null); |
| |
| return null; |
| } |
| }, NullPointerException.class, "Ouch! Argument cannot be null: msg"); |
| |
| assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| ignite1.message().send(null, Arrays.asList(null, new Object())); |
| |
| return null; |
| } |
| }, NullPointerException.class, "Ouch! Argument cannot be null: msg"); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAsyncOld() throws Exception { |
| final AtomicInteger msgCnt = new AtomicInteger(); |
| |
| IgniteDiscoverySpi discoSpi = (IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi(); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| discoSpi.setInternalListener(lsnr); |
| |
| assertFalse(ignite2.message().isAsync()); |
| |
| final IgniteMessaging msg = ignite2.message().withAsync(); |
| |
| assertTrue(msg.isAsync()); |
| |
| assertFalse(ignite2.message().isAsync()); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| msg.future(); |
| |
| return null; |
| } |
| }, IllegalStateException.class, null); |
| |
| lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); |
| |
| final String topic = "topic"; |
| |
| UUID id = msg.remoteListen(topic, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| System.out.println(Thread.currentThread().getName() + |
| " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| msgCnt.incrementAndGet(); |
| |
| return true; |
| } |
| }); |
| |
| Assert.assertNull(id); |
| |
| IgniteFuture<UUID> starFut = msg.future(); |
| |
| Assert.assertNotNull(starFut); |
| |
| U.sleep(500); |
| |
| Assert.assertFalse(starFut.isDone()); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| msg.future(); |
| |
| return null; |
| } |
| }, IllegalStateException.class, null); |
| |
| id = starFut.get(); |
| |
| Assert.assertNotNull(id); |
| |
| Assert.assertTrue(starFut.isDone()); |
| |
| lsnr.blockCustomEvent(StopRoutineDiscoveryMessage.class); |
| |
| message(ignite1.cluster().forRemotes()).send(topic, "msg1"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return msgCnt.get() > 0; |
| } |
| }, 5000); |
| |
| assertEquals(1, msgCnt.get()); |
| |
| msg.stopRemoteListen(id); |
| |
| IgniteFuture<?> stopFut = msg.future(); |
| |
| Assert.assertNotNull(stopFut); |
| |
| GridTestUtils.assertThrows(log, new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| msg.future(); |
| |
| return null; |
| } |
| }, IllegalStateException.class, null); |
| |
| U.sleep(500); |
| |
| Assert.assertFalse(stopFut.isDone()); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| stopFut.get(); |
| |
| Assert.assertTrue(stopFut.isDone()); |
| |
| message(ignite1.cluster().forRemotes()).send(topic, "msg2"); |
| |
| U.sleep(1000); |
| |
| assertEquals(1, msgCnt.get()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAsync() throws Exception { |
| final AtomicInteger msgCnt = new AtomicInteger(); |
| |
| IgniteDiscoverySpi discoSpi = (IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi(); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| discoSpi.setInternalListener(lsnr); |
| |
| lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); |
| |
| final String topic = "topic"; |
| |
| IgniteFuture<UUID> starFut = ignite2.message().remoteListenAsync(topic, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| System.out.println(Thread.currentThread().getName() + |
| " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| msgCnt.incrementAndGet(); |
| |
| return true; |
| } |
| }); |
| |
| Assert.assertNotNull(starFut); |
| |
| U.sleep(500); |
| |
| Assert.assertFalse(starFut.isDone()); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| UUID id = starFut.get(); |
| |
| Assert.assertNotNull(id); |
| |
| Assert.assertTrue(starFut.isDone()); |
| |
| lsnr.blockCustomEvent(StopRoutineDiscoveryMessage.class); |
| |
| message(ignite1.cluster().forRemotes()).send(topic, "msg1"); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return msgCnt.get() > 0; |
| } |
| }, 5000); |
| |
| assertEquals(1, msgCnt.get()); |
| |
| IgniteFuture<?> stopFut = ignite2.message().stopRemoteListenAsync(id); |
| |
| Assert.assertNotNull(stopFut); |
| |
| U.sleep(500); |
| |
| Assert.assertFalse(stopFut.isDone()); |
| |
| lsnr.stopBlockCustomEvents(); |
| |
| stopFut.get(); |
| |
| Assert.assertTrue(stopFut.isDone()); |
| |
| message(ignite1.cluster().forRemotes()).send(topic, "msg2"); |
| |
| U.sleep(1000); |
| |
| assertEquals(1, msgCnt.get()); |
| } |
| |
| /** |
| * Tests that message listener registers only for one oldest node. |
| * |
| * @throws Exception If an error occurred. |
| */ |
| @Test |
| public void testRemoteListenForOldest() throws Exception { |
| remoteListenForOldest(ignite1); |
| |
| // Restart oldest node. |
| stopGrid(1); |
| |
| ignite1 = startGrid(1); |
| |
| MSG_CNT.set(0); |
| |
| // Ignite2 is oldest now. |
| remoteListenForOldest(ignite2); |
| } |
| |
| /** |
| * @param expOldestIgnite Expected oldest ignite. |
| * @throws InterruptedException If failed. |
| */ |
| private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException { |
| ClusterGroup grp = ignite1.cluster().forOldest(); |
| |
| assertEquals(1, grp.nodes().size()); |
| assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id()); |
| |
| ignite1.message(grp).remoteListen(null, new P2<UUID, Object>() { |
| @Override public boolean apply(UUID nodeId, Object msg) { |
| System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); |
| |
| MSG_CNT.incrementAndGet(); |
| |
| return true; |
| } |
| }); |
| |
| ignite1.message().send(null, MSG_1); |
| |
| Thread.sleep(3000); |
| |
| assertEquals(1, MSG_CNT.get()); |
| } |
| } |