blob: d8bcd78ce18e47c3903df66c9004f735ad97ae96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.continuous;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.messaging.MessagingListenActor;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Message listen test.
*/
public class GridMessageListenSelfTest extends GridCommonAbstractTest {
/** */
private static final int GRID_CNT = 3;
/** */
private static final String INC_ATTR = "include";
/** */
private static final String MSG = "Message";
/** */
private static final String TOPIC = "Topic";
/** */
private static final int MSG_CNT = 3;
/** */
private static final String TOPIC_CLS_NAME = "org.apache.ignite.tests.p2p.GridTestMessageTopic";
/** */
private static final String LSNR_CLS_NAME = "org.apache.ignite.tests.p2p.GridTestMessageListener";
/** */
private static boolean include;
/** */
private static final List<UUID> allNodes = new ArrayList<>();
/** */
private static final List<UUID> rmtNodes = new ArrayList<>();
/** */
private static final List<UUID> incNodes = new ArrayList<>();
/** */
private static final Collection<UUID> nodes = new GridConcurrentHashSet<>();
/** */
private static final AtomicInteger cnt = new AtomicInteger();
/** */
private static CountDownLatch latch;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (include)
cfg.setUserAttributes(F.asMap(INC_ATTR, true));
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
nodes.clear();
cnt.set(0);
include = true;
startGridsMultiThreaded(GRID_CNT - 1);
include = false;
Thread.sleep(500);
startGrid(GRID_CNT - 1);
allNodes.clear();
rmtNodes.clear();
incNodes.clear();
for (int i = 0; i < GRID_CNT; i++) {
UUID id = grid(i).localNode().id();
allNodes.add(id);
if (i != 0)
rmtNodes.add(id);
if (i != GRID_CNT - 1)
incNodes.add(id);
}
Collections.sort(allNodes);
Collections.sort(rmtNodes);
Collections.sort(incNodes);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/**
* @throws Exception If failed.
*/
@Test
public void testNullTopic() throws Exception {
latch = new CountDownLatch(MSG_CNT * GRID_CNT);
listen(grid(0).cluster(), null, true);
send();
assertTrue(latch.await(2, SECONDS));
// Make sure that no more messages received than expected.
U.sleep(500);
assertEquals(MSG_CNT * GRID_CNT, cnt.get());
checkNodes(allNodes);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNonNullTopic() throws Exception {
latch = new CountDownLatch(MSG_CNT * GRID_CNT);
listen(grid(0).cluster(), TOPIC, true);
send();
assertTrue(latch.await(2, SECONDS));
// Make sure that no more messages received than expected.
U.sleep(500);
assertEquals(MSG_CNT * GRID_CNT, cnt.get());
checkNodes(allNodes);
}
/**
* @throws Exception If failed.
*/
@Test
public void testStopListen() throws Exception {
latch = new CountDownLatch(GRID_CNT);
listen(grid(0).cluster(), null, false);
send();
assert latch.await(2, SECONDS);
Thread.sleep(500);
int expCnt = cnt.get();
send();
Thread.sleep(1000);
assertEquals(expCnt, cnt.get());
checkNodes(allNodes);
}
/**
* @throws Exception If failed.
*/
@Test
public void testProjection() throws Exception {
latch = new CountDownLatch(MSG_CNT * (GRID_CNT - 1));
listen(grid(0).cluster().forRemotes(), null, true);
send();
assert latch.await(2, SECONDS);
Thread.sleep(500);
assertEquals(MSG_CNT * (GRID_CNT - 1), cnt.get());
checkNodes(rmtNodes);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNodeJoin() throws Exception {
latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1));
listen(grid(0).cluster(), null, true);
try {
Ignite g = startGrid("anotherGrid");
send();
assert latch.await(2, SECONDS);
Thread.sleep(500);
assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get());
List<UUID> allNodes0 = new ArrayList<>(allNodes);
allNodes0.add(g.cluster().localNode().id());
Collections.sort(allNodes0);
checkNodes(allNodes0);
}
finally {
stopGrid("anotherGrid");
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNodeJoinWithProjection() throws Exception {
latch = new CountDownLatch(MSG_CNT * GRID_CNT);
listen(grid(0).cluster().forAttribute(INC_ATTR, null), null, true);
try {
include = true;
Ignite g = startGrid("anotherGrid1");
include = false;
startGrid("anotherGrid2");
send();
assert latch.await(2, SECONDS);
Thread.sleep(500);
assertEquals(MSG_CNT * GRID_CNT, cnt.get());
List<UUID> incNodes0 = new ArrayList<>(incNodes);
incNodes0.add(g.cluster().localNode().id());
Collections.sort(incNodes0);
checkNodes(incNodes0);
}
finally {
stopGrid("anotherGrid1");
stopGrid("anotherGrid2");
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNullTopicWithDeployment() throws Exception {
Class<?> cls = getExternalClassLoader().loadClass(LSNR_CLS_NAME);
grid(0).message().remoteListen(null, (IgniteBiPredicate<UUID, Object>)cls.newInstance());
send();
boolean s = GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return checkDeployedListeners(GRID_CNT);
}
}, 2000);
assertTrue(s);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNonNullTopicWithDeployment() throws Exception {
ClassLoader ldr = getExternalClassLoader();
Class<?> topicCls = ldr.loadClass(TOPIC_CLS_NAME);
Class<?> lsnrCls = ldr.loadClass(LSNR_CLS_NAME);
Object topic = topicCls.newInstance();
grid(0).message().remoteListen(topic, (IgniteBiPredicate<UUID, Object>)lsnrCls.newInstance());
send(topic);
boolean s = GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return checkDeployedListeners(GRID_CNT);
}
}, 2000);
assertTrue(s);
}
/**
* @throws Exception If failed.
*/
@Test
public void testListenActor() throws Exception {
latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1));
grid(0).message().remoteListen(null, new Actor(grid(0).localNode().id()));
try {
Ignite g = startGrid("anotherGrid");
send();
assert latch.await(2, SECONDS);
Thread.sleep(500);
assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get());
List<UUID> allNodes0 = new ArrayList<>(allNodes);
allNodes0.add(g.cluster().localNode().id());
Collections.sort(allNodes0);
checkNodes(allNodes0);
}
finally {
stopGrid("anotherGrid");
}
}
/**
* @param prj Projection.
* @param topic Topic.
* @param ret Value returned from listener.
* @throws Exception In case of error.
*/
private void listen(final ClusterGroup prj, @Nullable Object topic, final boolean ret) throws Exception {
assert prj != null;
message(prj).remoteListen(topic, new Listener(grid(0).localNode().id(), ret));
}
/**
* @throws Exception In case of error.
*/
private void send() throws Exception {
send(TOPIC);
}
/**
* @param topic Non-null topic.
* @throws Exception In case of error.
*/
private void send(Object topic) throws Exception {
assert topic != null;
for (int i = 0; i < MSG_CNT; i++)
grid(0).message().send(null, MSG);
for (int i = 0; i < MSG_CNT; i++)
grid(0).message().send(topic, MSG);
}
/**
* @param expCnt Expected messages count.
* @return If check passed.
*/
private boolean checkDeployedListeners(int expCnt) {
for (Ignite g : G.allGrids()) {
AtomicInteger cnt = g.cluster().<String, AtomicInteger>nodeLocalMap().get("msgCnt");
if (cnt == null || cnt.get() != expCnt)
return false;
}
return true;
}
/**
* @param expNodes Expected nodes.
*/
private void checkNodes(List<UUID> expNodes) {
List<UUID> nodes0 = new ArrayList<>(nodes);
Collections.sort(nodes0);
assertEquals(expNodes, nodes0);
}
/** */
private static class Listener implements P2<UUID, Object> {
/** Source node id. */
private final UUID sourceNodeId;
/** */
private final boolean ret;
/** */
@IgniteInstanceResource
private Ignite ignite;
/**
* @param ret Return value.
* @param sourceNodeId expected source node id
*/
private Listener(UUID sourceNodeId, boolean ret) {
this.ret = ret;
this.sourceNodeId = sourceNodeId;
}
/** {@inheritDoc} */
@Override public boolean apply(UUID nodeId, Object msg) {
assertNotNull(ignite);
assertNotNull(ignite.configuration().getNodeId());
X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + ignite.cluster().localNode().id() + ']');
assertEquals(sourceNodeId, nodeId);
assertEquals(MSG, msg);
nodes.add(ignite.configuration().getNodeId());
cnt.incrementAndGet();
latch.countDown();
return ret;
}
}
/** */
private static class Actor extends MessagingListenActor<Object> {
/** Source node id. */
private final UUID sourceNodeId;
/**
* @param sourceNodeId expected source node id
*/
private Actor(UUID sourceNodeId) {
this.sourceNodeId = sourceNodeId;
}
/** {@inheritDoc} */
@Override protected void receive(UUID nodeId, Object msg) throws Throwable {
assertNotNull(ignite());
UUID locNodeId = ignite().cluster().localNode().id();
X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + locNodeId + ']');
assertEquals(sourceNodeId, nodeId);
assertEquals(MSG, msg);
nodes.add(locNodeId);
cnt.incrementAndGet();
latch.countDown();
}
}
}