| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal; |
| |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import javax.cache.event.CacheEntryEvent; |
| import javax.cache.event.CacheEntryUpdatedListener; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.cache.query.ContinuousQuery; |
| import org.apache.ignite.cache.query.QueryCursor; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; |
| import org.apache.ignite.internal.util.typedef.P2; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.junit.Test; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; |
| |
| /** |
| * |
| */ |
| public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest { |
| /** */ |
| private static volatile CountDownLatch latch; |
| |
| /** {@inheritDoc} */ |
| @Override protected int serverCount() { |
| return 3; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected int clientCount() { |
| return 1; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testEventListenerReconnect() throws Exception { |
| Ignite client = grid(serverCount()); |
| |
| assertTrue(client.cluster().localNode().isClient()); |
| |
| Ignite srv = ignite(0); |
| |
| IgniteDiscoverySpi srvSpi = spi0(srv); |
| |
| EventListener lsnr = new EventListener(); |
| |
| UUID opId = client.events().remoteListen(lsnr, null, EventType.EVT_JOB_STARTED); |
| |
| lsnr.latch = new CountDownLatch(1); |
| |
| log.info("Created remote listener: " + opId); |
| |
| final CountDownLatch reconnectLatch = new CountDownLatch(1); |
| |
| client.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { |
| info("Reconnected: " + evt); |
| |
| reconnectLatch.countDown(); |
| } |
| |
| return true; |
| } |
| }, EVT_CLIENT_NODE_RECONNECTED); |
| |
| srvSpi.failNode(client.cluster().localNode().id(), null); |
| |
| waitReconnectEvent(reconnectLatch); |
| |
| client.compute().run(new DummyJob()); |
| |
| assertTrue(lsnr.latch.await(5000, MILLISECONDS)); |
| |
| lsnr.latch = new CountDownLatch(1); |
| |
| srv.compute().run(new DummyJob()); |
| |
| assertTrue(lsnr.latch.await(5000, MILLISECONDS)); |
| |
| lsnr.latch = new CountDownLatch(1); |
| |
| log.info("Stop listen, should not get events anymore."); |
| |
| client.events().stopRemoteListen(opId); |
| |
| assertFalse(lsnr.latch.await(3000, MILLISECONDS)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMessageListenerReconnectAndStopFromServer() throws Exception { |
| testMessageListenerReconnect(false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMessageListenerReconnectAndStopFromClient() throws Exception { |
| testMessageListenerReconnect(true); |
| } |
| |
| /** |
| * @param stopFromClient If {@code true} stops listener from client node, otherwise from server. |
| * @throws Exception If failed. |
| */ |
| private void testMessageListenerReconnect(boolean stopFromClient) throws Exception { |
| Ignite client = grid(serverCount()); |
| |
| assertTrue(client.cluster().localNode().isClient()); |
| |
| Ignite srv = ignite(0); |
| |
| IgniteDiscoverySpi srvSpi = spi0(srv); |
| |
| final String topic = "testTopic"; |
| |
| MessageListener locLsnr = new MessageListener(); |
| |
| UUID opId = client.message().remoteListen(topic, new RemoteMessageListener()); |
| |
| client.message().localListen(topic, locLsnr); |
| |
| final CountDownLatch reconnectLatch = new CountDownLatch(1); |
| |
| client.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { |
| info("Reconnected: " + evt); |
| |
| reconnectLatch.countDown(); |
| } |
| |
| return true; |
| } |
| }, EVT_CLIENT_NODE_RECONNECTED); |
| |
| srvSpi.failNode(client.cluster().localNode().id(), null); |
| |
| waitReconnectEvent(reconnectLatch); |
| |
| locLsnr.latch = new CountDownLatch(1); |
| latch = new CountDownLatch(2); |
| |
| client.message().send(topic, "msg1"); |
| |
| assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); |
| assertTrue(latch.await(5000, MILLISECONDS)); |
| |
| locLsnr.latch = new CountDownLatch(1); |
| latch = new CountDownLatch(2); |
| |
| srv.message().send(topic, "msg2"); |
| |
| assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); |
| assertTrue(latch.await(5000, MILLISECONDS)); |
| |
| Ignite stopFrom = (stopFromClient ? client : srv); |
| |
| log.info("Stop listen, should not get remote messages anymore [from=" + stopFrom.name() + ']'); |
| |
| stopFrom.message().stopRemoteListen(opId); |
| |
| srv.message().send(topic, "msg3"); |
| |
| locLsnr.latch = new CountDownLatch(1); |
| latch = new CountDownLatch(1); |
| |
| assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); |
| assertFalse(latch.await(3000, MILLISECONDS)); |
| |
| log.info("New nodes should not register stopped listeners."); |
| |
| startGrid(serverCount() + 1); |
| |
| srv.message().send(topic, "msg4"); |
| |
| locLsnr.latch = new CountDownLatch(1); |
| latch = new CountDownLatch(1); |
| |
| assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); |
| assertFalse(latch.await(3000, MILLISECONDS)); |
| |
| stopGrid(serverCount() + 1); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheContinuousQueryReconnect() throws Exception { |
| Ignite client = grid(serverCount()); |
| |
| assertTrue(client.cluster().localNode().isClient()); |
| |
| IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); |
| |
| CacheEventListener lsnr = new CacheEventListener(); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setAutoUnsubscribe(true); |
| |
| qry.setLocalListener(lsnr); |
| |
| QueryCursor<?> cur = clientCache.query(qry); |
| |
| for (int i = 0; i < 5; i++) { |
| log.info("Iteration: " + i); |
| |
| continuousQueryReconnect(client, clientCache, lsnr); |
| } |
| |
| log.info("Close cursor, should not get cache events anymore."); |
| |
| cur.close(); |
| |
| lsnr.latch = new CountDownLatch(1); |
| |
| clientCache.put(3, 3); |
| |
| assertFalse(lsnr.latch.await(3000, MILLISECONDS)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheContinuousQueryReconnectNewServer() throws Exception { |
| Ignite client = grid(serverCount()); |
| |
| assertTrue(client.cluster().localNode().isClient()); |
| |
| IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); |
| |
| CacheEventListener lsnr = new CacheEventListener(); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setAutoUnsubscribe(true); |
| |
| qry.setLocalListener(lsnr); |
| |
| QueryCursor<?> cur = clientCache.query(qry); |
| |
| continuousQueryReconnect(client, clientCache, lsnr); |
| |
| // Check new server registers listener for reconnected client. |
| try (Ignite newSrv = startGrid(serverCount() + 1)) { |
| awaitPartitionMapExchange(); |
| |
| lsnr.latch = new CountDownLatch(10); |
| |
| IgniteCache<Object, Object> newSrvCache = newSrv.cache(DEFAULT_CACHE_NAME); |
| |
| for (Integer key : primaryKeys(newSrvCache, 10)) |
| newSrvCache.put(key, key); |
| |
| assertTrue(lsnr.latch.await(5000, MILLISECONDS)); |
| } |
| |
| cur.close(); |
| |
| // Check new server does not register listener for closed query. |
| try (Ignite newSrv = startGrid(serverCount() + 1)) { |
| awaitPartitionMapExchange(); |
| |
| lsnr.latch = new CountDownLatch(5); |
| |
| IgniteCache<Object, Object> newSrvCache = newSrv.cache(DEFAULT_CACHE_NAME); |
| |
| for (Integer key : primaryKeys(newSrvCache, 5)) |
| newSrvCache.put(key, key); |
| |
| assertFalse(lsnr.latch.await(3000, MILLISECONDS)); |
| } |
| } |
| |
| /** |
| * @param client Client. |
| * @param clientCache Client cache. |
| * @param lsnr Continuous query listener. |
| * @throws Exception If failed. |
| */ |
| private void continuousQueryReconnect(Ignite client, |
| IgniteCache<Object, Object> clientCache, |
| CacheEventListener lsnr) |
| throws Exception |
| { |
| Ignite srv = ignite(0); |
| |
| IgniteDiscoverySpi srvSpi = spi0(srv); |
| |
| final CountDownLatch reconnectLatch = new CountDownLatch(1); |
| |
| IgnitePredicate<Event> p = new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { |
| info("Reconnected: " + evt); |
| |
| reconnectLatch.countDown(); |
| } |
| |
| return true; |
| } |
| }; |
| |
| client.events().localListen(p, EVT_CLIENT_NODE_RECONNECTED); |
| |
| srvSpi.failNode(client.cluster().localNode().id(), null); |
| |
| waitReconnectEvent(reconnectLatch); |
| |
| client.events().stopLocalListen(p); |
| |
| lsnr.latch = new CountDownLatch(1); |
| |
| clientCache.put(1, 1); |
| |
| assertTrue(lsnr.latch.await(5000, MILLISECONDS)); |
| |
| lsnr.latch = new CountDownLatch(1); |
| |
| srv.cache(DEFAULT_CACHE_NAME).put(2, 2); |
| |
| assertTrue(lsnr.latch.await(5000, MILLISECONDS)); |
| } |
| |
| /** |
| * |
| */ |
| private static class EventListener implements P2<UUID, Event> { |
| /** */ |
| private volatile CountDownLatch latch; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(UUID uuid, Event evt) { |
| assertTrue(ignite.cluster().localNode().isClient()); |
| |
| ignite.log().info("Received event: " + evt); |
| |
| if (latch != null) |
| latch.countDown(); |
| |
| return true; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class MessageListener implements P2<UUID, Object> { |
| /** */ |
| private volatile CountDownLatch latch; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(UUID uuid, Object msg) { |
| assertTrue(ignite.cluster().localNode().isClient()); |
| |
| ignite.log().info("Local listener received message: " + msg); |
| |
| if (latch != null) |
| latch.countDown(); |
| |
| return true; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class RemoteMessageListener implements P2<UUID, Object> { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(UUID uuid, Object msg) { |
| ignite.log().info("Remote listener received message: " + msg); |
| |
| if (latch != null) |
| latch.countDown(); |
| |
| return true; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { |
| /** */ |
| private volatile CountDownLatch latch; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { |
| int cnt = 0; |
| |
| for (CacheEntryEvent<?, ?> evt : evts) { |
| ignite.log().info("Received cache event: " + evt); |
| |
| cnt++; |
| } |
| |
| assertEquals(1, cnt); |
| |
| if (latch != null) |
| latch.countDown(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class DummyJob implements IgniteRunnable { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| ignite.log().info("Job run."); |
| } |
| } |
| } |