blob: 70304a687abfdf113b57ddd504c4971e8d0e2052 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.messaging;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
/**
*
*/
public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable {
/** Threads number for multi-thread tests. */
private static final int THREADS = 10;
/** */
private final String TOPIC = "topic";
/** */
private final String msgStr = "message";
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/**
* Checks if use default mode, local listeners execute in the same thread, 1 node in topology.
*
* @throws Exception If failed.
*/
@Test
public void testSendDefaultMode() throws Exception {
Ignite ignite1 = startGrid(1);
send(ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread>() {
@Override public void apply(String msg, Thread thread) {
Assert.assertEquals(Thread.currentThread(), thread);
Assert.assertEquals(msgStr, msg);
}
}, false);
}
/**
* Checks if use async mode, local listeners execute in another thread, 1 node in topology.
*
* @throws Exception If failed.
*/
@Test
public void testSendAsyncMode() throws Exception {
Ignite ignite1 = startGrid(1);
send(ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread>() {
@Override public void apply(String msg, Thread thread) {
Assert.assertTrue(!Thread.currentThread().equals(thread));
Assert.assertEquals(msgStr, msg);
}
}, true);
}
/**
* Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology.
*
* @throws Exception If failed.
*/
@Test
public void testSendDefaultMode2Nodes() throws Exception {
Ignite ignite1 = startGrid(1);
Ignite ignite2 = startGrid(2);
sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread>() {
@Override public void apply(String msg, Thread thread) {
Assert.assertEquals(Thread.currentThread(), thread);
Assert.assertEquals(msgStr, msg);
}
}, false);
}
/**
* Checks if use async mode, local listeners execute in another thread, 2 nodes in topology.
*
* @throws Exception If failed.
*/
@Test
public void testSendAsyncMode2Node() throws Exception {
Ignite ignite1 = startGrid(1);
Ignite ignite2 = startGrid(2);
sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread>() {
@Override public void apply(String msg, Thread thread) {
Assert.assertTrue(!Thread.currentThread().equals(thread));
Assert.assertEquals(msgStr, msg);
}
}, true);
}
/**
* Checks that sendOrdered works in thread pool, 1 node in topology.
*
* @throws Exception If failed.
*/
@Test
public void testSendOrderedDefaultMode() throws Exception {
Ignite ignite1 = startGrid(1);
final List<String> msgs = orderedMessages();
sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
@Override public void apply(List<String> received, List<Thread> threads) {
assertFalse(threads.contains(Thread.currentThread()));
assertTrue(msgs.equals(received));
}
});
}
/**
* Checks that sendOrdered work in thread pool, 2 nodes in topology.
*
* @throws Exception If failed.
*/
@Test
public void testSendOrderedDefaultMode2Node() throws Exception {
Ignite ignite1 = startGrid(1);
Ignite ignite2 = startGrid(2);
final List<String> msgs = orderedMessages();
sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() {
@Override public void apply(List<String> received, List<Thread> threads) {
assertFalse(threads.contains(Thread.currentThread()));
assertTrue(msgs.equals(received));
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testSendOrderedDefaultModeMultiThreads() throws Exception {
Ignite ignite = startGrid(1);
sendOrderedMultiThreads(ignite.message());
}
/**
* @throws Exception If failed.
*/
@Test
public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
Ignite ignite1 = startGrid(1);
Ignite ignite2 = startGrid(2);
sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message());
}
/**
* @param ignite2 Second node.
* @param ignMsg IgniteMessage.
* @throws Exception If failed.
*/
private void sendOrderedMultiThreadsWith2Node(
final Ignite ignite2,
final IgniteMessaging ignMsg
) throws Exception {
final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
final List<String> msgs = orderedMessages();
sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs);
}
/**
* @param ignMsg IgniteMessaging.
* @throws Exception If failed.
*/
private void sendOrderedMultiThreads(
final IgniteMessaging ignMsg
) throws Exception {
final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
final List<String> msgs = orderedMessages();
sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
}
/**
* @param ignite2 Second node.
* @param ignMsg Ignite for send message.
* @param expMsg Expected messages map.
* @param actlMsg Actual message map.
* @param msgs List of messages.
* @throws Exception If failed.
*/
private void sendOrderedMultiThreadsWith2Node(
final Ignite ignite2,
final IgniteMessaging ignMsg,
final ConcurrentMap<String, List<String>> expMsg,
final ConcurrentMap<String, List<String>> actlMsg,
final List<String> msgs
) throws Exception {
final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap();
ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
@Override public boolean apply(UUID uuid, Message msg) {
actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
actlMsgNode2.get(msg.threadName).add(msg.msg);
latch.countDown();
return true;
}
});
sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
latch.await();
assertEquals(expMsg.size(), actlMsgNode2.size());
for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue()));
}
/**
* @param ignMsg Ignite for send message.
* @param expMsg Expected messages map.
* @param actlMsg Actual message map.
* @param msgs List of messages.
* @throws Exception If failed.
*/
private void sendOrderedMultiThreads(
final IgniteMessaging ignMsg,
final ConcurrentMap<String, List<String>> expMsg,
final ConcurrentMap<String, List<String>> actlMsg,
final List<String> msgs
) throws Exception {
final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
@Override public boolean apply(UUID uuid, Message msg) {
actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
actlMsg.get(msg.threadName).add(msg.msg);
latch.countDown();
return true;
}
});
for (int i = 0; i < THREADS; i++)
new Thread(new Runnable() {
@Override public void run() {
String thdName = Thread.currentThread().getName();
List<String> exp = Lists.newArrayList();
expMsg.put(thdName, exp);
for (String msg : msgs) {
exp.add(msg);
ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000);
}
}
}).start();
latch.await();
assertEquals(expMsg.size(), actlMsg.size());
for (Map.Entry<String, List<String>> entry : expMsg.entrySet())
assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue()));
}
/**
* @param ignite2 Second node.
* @param igniteMsg Ignite message.
* @param msgStr Message string.
* @param cls Callback for compare result.
* @param async Use sendAsync flag.
* @throws Exception If failed.
*/
private void sendWith2Nodes(
final Ignite ignite2,
final IgniteMessaging igniteMsg,
final String msgStr,
final IgniteBiInClosure<String, Thread> cls,
final boolean async
) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
@Override public boolean apply(UUID uuid, String msg) {
Assert.assertEquals(msgStr, msg);
latch.countDown();
return true;
}
});
send(igniteMsg, msgStr, cls, async);
latch.await();
}
/**
* @param igniteMsg Ignite messaging.
* @param msgStr Message string.
* @param cls Callback for compare result.
* @param async Use sendAsync flag.
* @throws Exception If failed.
*/
private void send(
final IgniteMessaging igniteMsg,
final String msgStr,
final IgniteBiInClosure<String, Thread> cls,
final boolean async
) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<>();
final AtomicReference<String> val = new AtomicReference<>();
igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
@Override public boolean apply(UUID uuid, String msgStr) {
thread.set(Thread.currentThread());
val.set(msgStr);
latch.countDown();
return true;
}
});
if (async)
igniteMsg.withAsync().send(TOPIC, msgStr);
else
igniteMsg.send(TOPIC, msgStr);
latch.await();
cls.apply(val.get(), thread.get());
}
/**
* @param ignite2 Second node.
* @param igniteMsg Ignite message.
* @param msgs messages for send.
* @param cls Callback for compare result.
* @throws Exception If failed.
*/
private void sendOrderedWith2Node(
final Ignite ignite2,
final IgniteMessaging igniteMsg,
final List<String> msgs,
final IgniteBiInClosure<List<String>, List<Thread>> cls
) throws Exception {
final CountDownLatch latch = new CountDownLatch(msgs.size());
final List<String> received = Lists.newArrayList();
ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
@Override public boolean apply(UUID uuid, String msg) {
received.add(msg);
latch.countDown();
return true;
}
});
sendOrdered(igniteMsg, msgs, cls);
latch.await();
assertTrue(msgs.equals(received));
}
/**
* @param igniteMsg Ignite message.
* @param msgs messages for send.
* @param cls Callback for compare result.
* @throws Exception If failed.
*/
private <T> void sendOrdered(
final IgniteMessaging igniteMsg,
final List<T> msgs,
final IgniteBiInClosure<List<T>, List<Thread>> cls
) throws Exception {
final CountDownLatch latch = new CountDownLatch(msgs.size());
final List<T> received = Lists.newArrayList();
final List<Thread> threads = Lists.newArrayList();
for (T msg : msgs)
igniteMsg.sendOrdered(TOPIC, msg, 1000);
igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() {
@Override public boolean apply(UUID uuid, T s) {
received.add(s);
threads.add(Thread.currentThread());
latch.countDown();
return true;
}
});
latch.await();
cls.apply(received, threads);
}
/**
* @return List of ordered messages
*/
private List<String> orderedMessages() {
final List<String> msgs = Lists.newArrayList();
for (int i = 0; i < 1000; i++)
msgs.add(String.valueOf(ThreadLocalRandom.current().nextInt()));
return msgs;
}
/**
*
*/
private static class Message implements Serializable {
/** Thread name. */
private final String threadName;
/** Message. */
private final String msg;
/**
* @param threadName Thread name.
* @param msg Message.
*/
private Message(String threadName, String msg) {
this.threadName = threadName;
this.msg = msg;
}
}
}