blob: 288ccc006c42eb79860c9398903efbbe465fda2e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.messaging;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest;
import org.junit.Test;
/**
* The test checks process messaging.
*/
public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVariationsAbstractTest {
/**
* Message topic.
*/
private static final String MESSAGE_TOPIC = "topic";
/** */
private static final int MSGS = 100;
/**
* Static latch.
*/
public static CountDownLatch LATCH;
/** {@inheritDoc} */
@Override protected boolean expectedClient(String testGridName) {
return getTestIgniteInstanceName(CLIENT_NODE_IDX).equals(testGridName)
|| getTestIgniteInstanceName(3).equals(testGridName)
|| getTestIgniteInstanceName(5).equals(testGridName);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalServer() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
localServerInternal(false);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalServerAsync() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
localServerInternal(true);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalListener() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
localListenerInternal();
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testServerClientMessage() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
serverClientMessage(false);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testServerClientMessageAsync() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
serverClientMessage(true);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientClientMessage() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
clientClientMessage(false);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientClientMessageAsync() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
clientClientMessage(true);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientServerMessage() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
clientServerMessage(false);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientServerMessageAsync() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
clientServerMessage(true);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testCollectionMessage() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
collectionMessage();
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testOrderedMessage() throws Exception {
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
orderedMessage();
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientServerOrderedMessage() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
clientServerOrderedMessage();
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientClientOrderedMessage() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
clientClientOrderedMessage();
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testServerClientOrderedMessage() throws Exception {
if (!testsCfg.withClients())
return;
runInAllDataModes(new TestRunnable() {
@Override public void run() throws Exception {
serverClientOrderedMessage();
}
});
}
/**
* Single server test.
*
* @param async Async message send flag.
* @throws Exception If failed.
*/
private void localServerInternal(boolean async) throws Exception {
int messages = MSGS;
Ignite ignite = grid(SERVER_NODE_IDX);
LATCH = new CountDownLatch(messages);
ClusterGroup grp = grid(SERVER_NODE_IDX).cluster().forLocal();
UUID opId = registerListener(grp);
try {
for (int i = 0; i < messages; i++)
sendMessage(ignite, grp, value(i), async);
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
}
finally {
ignite.message().stopRemoteListen(opId);
}
}
/**
* Single server test with local listener.
* @throws Exception If failed.
*/
private void localListenerInternal() throws Exception {
int messages = MSGS;
Ignite ignite = grid(SERVER_NODE_IDX);
LATCH = new CountDownLatch(messages);
ClusterGroup grp = grid(SERVER_NODE_IDX).cluster().forLocal();
MessageListener c = new MessageListener();
try {
ignite.message(grp).localListen("localListenerTopic", c);
for (int i = 0; i < messages; i++)
ignite.message(grp).send("localListenerTopic", value(i));
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
}
finally {
ignite.message().stopLocalListen("localListenerTopic", c);
}
}
/**
* Server sends a message and client receives it.
*
* @param async Async message send flag.
* @throws Exception If failed.
*/
private void serverClientMessage(boolean async) throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert !grp.nodes().isEmpty();
registerListenerAndSendMessages(ignite, grp, async);
}
/**
* Client sends a message and client receives it.
*
* @param async Async message send flag.
* @throws Exception If failed.
*/
private void clientClientMessage(boolean async) throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert !grp.nodes().isEmpty();
registerListenerAndSendMessages(ignite, grp, async);
}
/**
* Client sends a message and client receives it.
*
* @param async Async message send flag.
* @throws Exception If failed.
*/
private void clientServerMessage(boolean async) throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forServers();
assert !grp.nodes().isEmpty();
registerListenerAndSendMessages(ignite, grp, async);
}
/**
* @param ignite Ignite.
* @param grp Cluster group.
* @param async Async message send flag.
* @throws Exception If fail.
*/
private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception {
int messages = MSGS;
LATCH = new CountDownLatch(grp.nodes().size() * messages);
UUID opId = registerListener(grp);
try {
for (int i = 0; i < messages; i++)
sendMessage(ignite, grp, value(i), async);
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
}
finally {
ignite.message().stopRemoteListen(opId);
}
}
/**
*
* @throws Exception If fail.
*/
private void collectionMessage() throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();
assert !grp.nodes().isEmpty();
int messages = MSGS;
LATCH = new CountDownLatch(grp.nodes().size() * messages);
UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new MessageListener());
try {
List<Object> msgs = new ArrayList<>();
for (int i = 0; i < messages; i++)
msgs.add(value(i));
ignite.message(grp).send(MESSAGE_TOPIC, msgs);
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
}
finally {
ignite.message().stopRemoteListen(opId);
}
}
/**
* @throws Exception If fail.
*/
private void orderedMessage() throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal();
assert !grp.nodes().isEmpty();
registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
* @throws Exception If fail.
*/
private void clientServerOrderedMessage() throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forServers();
assert !grp.nodes().isEmpty();
registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
* @throws Exception If fail.
*/
private void clientClientOrderedMessage() throws Exception {
Ignite ignite = grid(CLIENT_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert !grp.nodes().isEmpty();
registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
* @throws Exception If fail.
*/
private void serverClientOrderedMessage() throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
ClusterGroup grp = ignite.cluster().forClients();
assert !grp.nodes().isEmpty();
registerListenerAndSendOrderedMessages(ignite, grp);
}
/**
* @param ignite Ignite.
* @param grp Cluster group.
* @throws Exception If fail.
*/
private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception {
int messages = MSGS;
LATCH = new CountDownLatch(grp.nodes().size() * messages);
UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener());
try {
for (int i = 0; i < messages; i++)
ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
assertTrue(LATCH.await(10, TimeUnit.SECONDS));
}
finally {
ignite.message().stopRemoteListen(opId);
}
}
/**
* @param nodeSnd Sender Ignite node.
* @param grp Cluster group.
* @param msg Message.
* @param async Async message send flag.
*/
private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) {
if (async)
nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg);
else
nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
}
/**
* @param grp Cluster group.
* @return Message listener uuid.
* @throws Exception If failed.
*/
private UUID registerListener(ClusterGroup grp) throws Exception {
Ignite ignite = grid(SERVER_NODE_IDX);
IgniteBiPredicate<UUID, Object> lsnr = new MessageListener();
return ignite.message(grp).remoteListen(MESSAGE_TOPIC, lsnr);
}
/**
* Ignite predicate.
*/
private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
/**
* Default constructor.
*/
public MessageListener() {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean apply(UUID nodeId, Object msg) {
LATCH.countDown();
return true;
}
}
/**
* Ignite order predicate.
*/
private static class OrderedMessageListener implements IgniteBiPredicate<UUID, TestObject> {
/**
* Counter.
*/
private AtomicInteger cntr;
/**
* Default constructor.
*/
OrderedMessageListener() {
cntr = new AtomicInteger(0);
}
/** {@inheritDoc} */
@Override public boolean apply(UUID nodeId, TestObject msg) {
assertEquals(cntr.getAndIncrement(), msg.value());
LATCH.countDown();
return true;
}
}
}