| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Future; |
| import javax.cache.CacheException; |
| import javax.cache.processor.EntryProcessorException; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteAtomicLong; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteClientDisconnectedException; |
| import org.apache.ignite.IgniteCompute; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteMessaging; |
| import org.apache.ignite.IgniteQueue; |
| import org.apache.ignite.IgniteSet; |
| import org.apache.ignite.IgniteTransactions; |
| import org.apache.ignite.cache.CacheEntryProcessor; |
| import org.apache.ignite.configuration.AtomicConfiguration; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.CollectionConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteCallable; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.spi.discovery.DiscoverySpi; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.junit.Test; |
| |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; |
| import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; |
| |
| /** |
| * |
| */ |
| public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnectAbstractTest { |
| /** Cache key for test put and invoke operation after reconnect */ |
| private static final int CACHE_PUT_INVOKE_KEY = 10010; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected int serverCount() { |
| return 1; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testErrorOnDisconnect() throws Exception { |
| // Check cache operations. |
| cacheOperationsTest(); |
| |
| // Check cache operations. |
| beforeTestsStarted(); |
| dataStructureOperationsTest(); |
| |
| // Check ignite operations. |
| beforeTestsStarted(); |
| igniteOperationsTest(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| private void dataStructureOperationsTest() throws Exception { |
| final Ignite client = startClientGrid(serverCount()); |
| |
| doTestIgniteOperationOnDisconnect(client, Arrays.asList( |
| // Check atomic long. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.atomicLong("testAtomic", 41, true); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.atomicLong("testAtomic", 41, true); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNotNull(o); |
| |
| IgniteAtomicLong atomicLong = (IgniteAtomicLong)o; |
| |
| assertEquals(42, atomicLong.incrementAndGet()); |
| |
| return true; |
| } |
| } |
| ), |
| // Check set. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.set("testSet", getCollectionConfiguration()); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.set("testSet", getCollectionConfiguration()); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNotNull(o); |
| |
| IgniteSet set = (IgniteSet)o; |
| |
| String val = "testVal"; |
| |
| set.add(val); |
| |
| assertEquals(1, set.size()); |
| assertTrue(set.contains(val)); |
| |
| return true; |
| } |
| } |
| ), |
| // Check ignite queue. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.queue("TestQueue", 10, getCollectionConfiguration()); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.queue("TestQueue", 10, getCollectionConfiguration()); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNotNull(o); |
| |
| IgniteQueue queue = (IgniteQueue)o; |
| |
| String val = "Test"; |
| |
| queue.add(val); |
| |
| assertEquals(val, queue.poll()); |
| |
| return true; |
| } |
| } |
| ) |
| )); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void cacheOperationsTest() throws Exception { |
| final Ignite client = startClientGrid(serverCount()); |
| |
| final IgniteCache<Object, Object> dfltCache = client.cache(DEFAULT_CACHE_NAME); |
| |
| assertNotNull(dfltCache); |
| |
| doTestIgniteOperationOnDisconnect(client, Arrays.asList( |
| // Check put and get operation. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| dfltCache.getAndPut(9999, 9999); |
| } |
| catch (CacheException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return dfltCache.getAndPut(9999, 9999); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNull(o); |
| |
| assertEquals(9999, dfltCache.get(9999)); |
| |
| return true; |
| } |
| } |
| ), |
| // Check put operation. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| dfltCache.put(10000, 10000); |
| } |
| catch (CacheException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| dfltCache.put(10000, 10000); |
| |
| return true; |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertTrue((Boolean)o); |
| |
| assertEquals(10000, dfltCache.get(10000)); |
| |
| return true; |
| } |
| } |
| ), |
| // Check get operation. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| dfltCache.get(10001); |
| } |
| catch (CacheException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return dfltCache.get(10001); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNull(o); |
| |
| return true; |
| } |
| } |
| ), |
| // Check put and invoke operation. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| dfltCache.put(CACHE_PUT_INVOKE_KEY, 10000); |
| dfltCache.invoke(CACHE_PUT_INVOKE_KEY, new CacheEntryProcessor<Object, Object, Object>() { |
| @Override public Object process(MutableEntry<Object, Object> entry, |
| Object... arguments) throws EntryProcessorException { |
| assertTrue(entry.exists()); |
| |
| return (int)entry.getValue() * 2; |
| } |
| }); |
| } |
| catch (CacheException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| dfltCache.put(CACHE_PUT_INVOKE_KEY, 10000); |
| return dfltCache.invoke(CACHE_PUT_INVOKE_KEY, new CacheEntryProcessor<Object, Object, Object>() { |
| @Override public Object process(MutableEntry<Object, Object> entry, |
| Object... arguments) throws EntryProcessorException { |
| assertTrue(entry.exists()); |
| |
| return (int)entry.getValue() * 2; |
| } |
| }); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNotNull(o); |
| |
| assertEquals(20000, (int)o); |
| |
| return true; |
| } |
| } |
| ), |
| // Check put async operation. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| dfltCache.putAsync(10002, 10002).get(); |
| } |
| catch (CacheException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return dfltCache.putAsync(10002, 10002).get(); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNull(o); |
| |
| assertEquals(10002, dfltCache.get(10002)); |
| |
| return true; |
| } |
| } |
| ), |
| // Check transaction. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.transactions(); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.transactions(); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| IgniteTransactions txs = (IgniteTransactions)o; |
| |
| assertNotNull(txs); |
| |
| return true; |
| } |
| } |
| ), |
| // Check get cache. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.cache(DEFAULT_CACHE_NAME); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.cache(DEFAULT_CACHE_NAME); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)o; |
| |
| assertNotNull(cache0); |
| |
| cache0.put(1, 1); |
| |
| assertEquals(1, cache0.get(1)); |
| |
| return true; |
| } |
| } |
| ), |
| // Check streamer. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.dataStreamer(DEFAULT_CACHE_NAME); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.dataStreamer(DEFAULT_CACHE_NAME); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)o; |
| |
| streamer.addData(2, 2); |
| |
| streamer.close(); |
| |
| assertEquals(2, client.cache(DEFAULT_CACHE_NAME).get(2)); |
| |
| return true; |
| } |
| } |
| ), |
| // Check create cache. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.createCache("test_cache"); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.createCache("test_cache"); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| IgniteCache<Object, Object> cache = (IgniteCache<Object, Object>)o; |
| |
| assertNotNull(cache); |
| |
| cache.put(1, 1); |
| |
| assertEquals(1, cache.get(1)); |
| |
| return true; |
| } |
| } |
| ) |
| |
| )); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void igniteOperationsTest() throws Exception { |
| final Ignite client = startClientGrid(serverCount()); |
| |
| final IgniteCache<Object, Object> dfltCache = client.cache(DEFAULT_CACHE_NAME); |
| |
| final CountDownLatch recvLatch = new CountDownLatch(1); |
| |
| assertNotNull(dfltCache); |
| |
| doTestIgniteOperationOnDisconnect(client, Arrays.asList( |
| // Check compute. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.compute(); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.compute(); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| IgniteCompute comp = (IgniteCompute)o; |
| |
| Collection<UUID> uuids = comp.broadcast(new IgniteCallable<UUID>() { |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| @Override public UUID call() throws Exception { |
| return ignite.cluster().localNode().id(); |
| } |
| }); |
| |
| assertFalse(uuids.isEmpty()); |
| |
| for (UUID uuid : uuids) |
| assertNotNull(uuid); |
| |
| return true; |
| } |
| } |
| ), |
| |
| // Check ping node. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.cluster().pingNode(new UUID(0, 0)); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.cluster().pingNode(new UUID(0, 0)); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| Boolean pingNode = (Boolean)o; |
| |
| assertFalse(pingNode); |
| |
| return true; |
| } |
| } |
| ), |
| // Check register remote listener. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.events().remoteListen(null, new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event event) { |
| return true; |
| } |
| }); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.events().remoteListen(null, new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event event) { |
| return true; |
| } |
| }); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| UUID remoteId = (UUID)o; |
| |
| assertNotNull(remoteId); |
| |
| client.events().stopRemoteListen(remoteId); |
| |
| return true; |
| } |
| } |
| ), |
| // Check message operation. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() { |
| @Override public boolean apply(UUID uuid, Object o) { |
| if (o.equals("Test message.")) |
| recvLatch.countDown(); |
| |
| return true; |
| } |
| }); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() { |
| @Override public boolean apply(UUID uuid, Object o) { |
| if (o.equals("Test message.")) |
| recvLatch.countDown(); |
| |
| return true; |
| } |
| }); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNotNull(o); |
| |
| IgniteMessaging msg = client.message(); |
| |
| msg.send(null, "Test message."); |
| |
| try { |
| assertTrue(recvLatch.await(2, SECONDS)); |
| } |
| catch (InterruptedException ignored) { |
| fail("Message wasn't received."); |
| } |
| |
| return true; |
| } |
| } |
| ), |
| // Check executor. |
| new T2<Callable, C1<Object, Boolean>>( |
| new Callable() { |
| @Override public Object call() throws Exception { |
| boolean failed = false; |
| |
| try { |
| client.executorService().submit(new Callable<Integer>() { |
| @Override public Integer call() throws Exception { |
| return 42; |
| } |
| }); |
| } |
| catch (IgniteClientDisconnectedException e) { |
| failed = true; |
| |
| checkAndWait(e); |
| } |
| |
| assertTrue(failed); |
| |
| return client.executorService().submit(new Callable<Integer>() { |
| @Override public Integer call() throws Exception { |
| return 42; |
| } |
| }); |
| } |
| }, |
| new C1<Object, Boolean>() { |
| @Override public Boolean apply(Object o) { |
| assertNotNull(o); |
| |
| Future<Integer> fut = (Future<Integer>)o; |
| |
| try { |
| assertEquals(42, (int)fut.get()); |
| } |
| catch (Exception ignored) { |
| fail("Failed submit task."); |
| } |
| |
| return true; |
| } |
| } |
| ) |
| )); |
| } |
| |
| /** |
| * @param client Client. |
| * @param ops Operations closures. |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| private void doTestIgniteOperationOnDisconnect(Ignite client, final List<T2<Callable, C1<Object, Boolean>>> ops) |
| throws Exception { |
| assertNotNull(client.cache(DEFAULT_CACHE_NAME)); |
| |
| final IgniteDiscoverySpi clientSpi = spi0(client); |
| |
| Ignite srv = clientRouter(client); |
| |
| DiscoverySpi srvSpi = spi0(srv); |
| |
| final CountDownLatch disconnectLatch = new CountDownLatch(1); |
| |
| final CountDownLatch reconnectLatch = new CountDownLatch(1); |
| |
| log.info("Block reconnect."); |
| |
| DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); |
| |
| clientSpi.setInternalListener(lsnr); |
| lsnr.startBlockJoin(); |
| |
| final List<IgniteInternalFuture> futs = new ArrayList<>(); |
| |
| client.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { |
| info("Disconnected: " + evt); |
| |
| assertEquals(1, reconnectLatch.getCount()); |
| |
| for (T2<Callable, C1<Object, Boolean>> op : ops) |
| futs.add(GridTestUtils.runAsync(op.get1())); |
| |
| disconnectLatch.countDown(); |
| } |
| else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { |
| info("Reconnected: " + evt); |
| |
| reconnectLatch.countDown(); |
| } |
| |
| return true; |
| } |
| }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); |
| |
| try { |
| log.info("Fail client."); |
| |
| srvSpi.failNode(client.cluster().localNode().id(), null); |
| |
| waitReconnectEvent(disconnectLatch); |
| |
| assertEquals(ops.size(), futs.size()); |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| assertNotDone(fut); |
| |
| U.sleep(2000); |
| |
| for (IgniteInternalFuture<?> fut : futs) |
| assertNotDone(fut); |
| |
| log.info("Allow reconnect."); |
| |
| lsnr.stopBlockJoin(); |
| |
| waitReconnectEvent(reconnectLatch); |
| |
| // Check operation after reconnect working. |
| for (int i = 0; i < futs.size(); i++) { |
| final int i0 = i; |
| |
| try { |
| final Object futRes = futs.get(i0).get(2, SECONDS); |
| |
| assertTrue(GridTestUtils.runAsync(new Callable<Boolean>() { |
| @Override public Boolean call() throws Exception { |
| return ops.get(i0).get2().apply(futRes); |
| } |
| }).get(2, SECONDS)); |
| } |
| catch (IgniteFutureTimeoutCheckedException e) { |
| e.printStackTrace(); |
| |
| fail("Operation timeout. Iteration: " + i + "."); |
| } |
| } |
| } |
| finally { |
| lsnr.stopBlockJoin(); |
| |
| for (IgniteInternalFuture fut : futs) |
| fut.cancel(); |
| |
| stopAllGrids(); |
| } |
| } |
| |
| /** Get {@link CollectionConfiguration} with number of backups equal to {@link AtomicConfiguration} default */ |
| private CollectionConfiguration getCollectionConfiguration() { |
| return new CollectionConfiguration().setBackups(AtomicConfiguration.DFLT_BACKUPS); |
| } |
| } |