| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteClientDisconnectedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.Ignition; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.failure.AbstractFailureHandler; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.TestFailureHandler; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; |
| import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.WithSystemProperty; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; |
| |
| /** |
| * Tests client to be able restore connection to cluster if coordination is not available. |
| */ |
| @WithSystemProperty(key = IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true") |
| public class IgniteClientRejoinTest extends GridCommonAbstractTest { |
| /** Block. */ |
| private volatile boolean block; |
| |
| /** Block all. */ |
| private volatile boolean blockAll; |
| |
| /** Coordinator. */ |
| private volatile ClusterNode crd; |
| |
| /** Client reconnect disabled. */ |
| private boolean clientReconnectDisabled; |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| clientReconnectDisabled = false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(gridName); |
| |
| if (gridName.contains("client")) { |
| cfg.setCommunicationSpi(new TcpCommunicationSpi()); |
| |
| TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); |
| DiscoverySpi dspi = new DiscoverySpi(); |
| |
| dspi.setIpFinder(spi.getIpFinder()); |
| |
| cfg.setDiscoverySpi(dspi); |
| |
| dspi.setJoinTimeout(60_000); |
| dspi.setClientReconnectDisabled(clientReconnectDisabled); |
| } |
| |
| // TODO: IGNITE-4833 |
| cfg.setPeerClassLoadingEnabled(false); |
| |
| return cfg; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testClientsReconnectAfterStart() throws Exception { |
| Ignite srv1 = startGrid("server1"); |
| |
| crd = ((IgniteKernal)srv1).localNode(); |
| |
| Ignite srv2 = startGrid("server2"); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| List<Ignite> clientNodes = new ArrayList<>(); |
| |
| final int CLIENTS_NUM = 5; |
| |
| for (int i = 0; i < CLIENTS_NUM; i++) |
| clientNodes.add(startClientGrid("client" + i)); |
| |
| blockAll = true; |
| |
| GridTestUtils.runAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| U.sleep(5_000); |
| |
| block = true; |
| blockAll = false; |
| |
| System.out.println(">>> Allow with blocked coordinator."); |
| |
| latch.countDown(); |
| |
| return null; |
| } |
| }); |
| |
| IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| latch.await(); |
| |
| U.sleep((new Random().nextInt(15) + 30) * 1000); |
| |
| block = false; |
| |
| System.out.println(">>> Allow coordinator."); |
| |
| return null; |
| } |
| }); |
| |
| fut.get(); |
| |
| for (Ignite client : clientNodes) { |
| while (true) { |
| try { |
| IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some"); |
| |
| for (int i = 0; i < 100; i++) |
| cache.put(i, i); |
| |
| for (int i = 0; i < 100; i++) |
| assertEquals((Integer)i, cache.get(i)); |
| |
| cache.clear(); |
| |
| break; |
| } |
| catch (IgniteClientDisconnectedException e) { |
| e.reconnectFuture().get(); |
| } |
| } |
| } |
| |
| assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size()); |
| assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testClientsReconnect() throws Exception { |
| Ignite srv1 = startGrid("server1"); |
| |
| crd = ((IgniteKernal)srv1).localNode(); |
| |
| Ignite srv2 = startGrid("server2"); |
| |
| block = true; |
| |
| List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>(); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| final int CLIENTS_NUM = 5; |
| |
| for (int i = 0; i < CLIENTS_NUM; i++) { |
| final int idx = i; |
| |
| IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { |
| @Override public Ignite call() throws Exception { |
| latch.await(); |
| |
| String nodeName = "client" + idx; |
| |
| IgniteConfiguration cfg = getConfiguration(nodeName) |
| .setFailureHandler(new AbstractFailureHandler() { |
| @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { |
| // This should _not_ fire when exchange-worker terminates before reconnect. |
| Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE); |
| |
| return false; |
| } |
| }); |
| |
| return startClientGrid(nodeName, optimize(cfg)); |
| } |
| }); |
| |
| futs.add(fut); |
| } |
| |
| GridTestUtils.runAsync(new Callable<Boolean>() { |
| @Override public Boolean call() throws Exception { |
| latch.countDown(); |
| |
| Random rnd = new Random(); |
| |
| U.sleep((rnd.nextInt(15) + 15) * 1000); |
| |
| block = false; |
| |
| System.out.println(">>> ALLOW connection to coordinator."); |
| |
| return true; |
| } |
| }); |
| |
| for (IgniteInternalFuture<Ignite> clientFut : futs) { |
| Ignite client = clientFut.get(); |
| |
| IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name()); |
| |
| for (int i = 0; i < 100; i++) |
| cache.put(i, i); |
| |
| for (int i = 0; i < 100; i++) |
| assert i == cache.get(i); |
| } |
| |
| assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size()); |
| assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testClientsReconnectDisabled() throws Exception { |
| clientReconnectDisabled = true; |
| |
| Ignite srv1 = startGrid("server1"); |
| |
| if (!tcpDiscovery()) |
| return; |
| |
| crd = ((IgniteKernal)srv1).localNode(); |
| |
| Ignite srv2 = startGrid("server2"); |
| |
| block = true; |
| |
| List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>(); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| final int CLIENTS_NUM = 5; |
| |
| final CountDownLatch failureHndLatch = new CountDownLatch(CLIENTS_NUM); |
| |
| for (int i = 0; i < CLIENTS_NUM; i++) { |
| final int idx = i; |
| |
| IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { |
| @Override public Ignite call() throws Exception { |
| latch.await(); |
| |
| String igniteInstanceName = "client" + idx; |
| |
| return startClientGrid(igniteInstanceName, getConfiguration(igniteInstanceName) |
| .setFailureHandler(new TestFailureHandler(true, failureHndLatch))); |
| } |
| }); |
| |
| futs.add(fut); |
| } |
| |
| latch.countDown(); |
| |
| for (final IgniteInternalFuture<Ignite> clientFut : futs) { |
| //noinspection ThrowableNotThrown |
| GridTestUtils.assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| clientFut.get(); |
| |
| return null; |
| } |
| }, IgniteCheckedException.class, null); |
| } |
| |
| assertTrue(failureHndLatch.await(1000, TimeUnit.MILLISECONDS)); |
| |
| assertEquals(0, srv1.cluster().forClients().nodes().size()); |
| assertEquals(0, srv2.cluster().forClients().nodes().size()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return 3 * 60_000; |
| } |
| |
| /** |
| * |
| */ |
| private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi { |
| /** {@inheritDoc} */ |
| @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { |
| if (blockAll || block && node.id().equals(crd.id())) |
| throw new IgniteSpiException(new SocketException("Test communication exception")); |
| |
| super.sendMessage(node, msg); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void sendMessage(ClusterNode node, Message msg, |
| IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { |
| if (blockAll || block && node.id().equals(crd.id())) |
| throw new IgniteSpiException(new SocketException("Test communication exception")); |
| |
| super.sendMessage(node, msg, ackC); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class DiscoverySpi extends TcpDiscoverySpi { |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, |
| long timeout) throws IOException { |
| if (blockAll || block && sock.getPort() == 47500) |
| throw new SocketException("Test discovery exception"); |
| |
| super.writeToSocket(sock, msg, data, timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| if (blockAll || block && sock.getPort() == 47500) |
| throw new SocketException("Test discovery exception"); |
| |
| super.writeToSocket(sock, msg, timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| if (blockAll || block && sock.getPort() == 47500) |
| throw new SocketException("Test discovery exception"); |
| |
| super.writeToSocket(sock, out, msg, timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, |
| long timeout) throws IOException { |
| if (blockAll || block && sock.getPort() == 47500) |
| throw new SocketException("Test discovery exception"); |
| |
| super.writeToSocket(msg, sock, res, timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr, |
| IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { |
| if (blockAll || block && sock.getPort() == 47500) |
| throw new SocketException("Test discovery exception"); |
| |
| return super.openSocket(sock, remAddr, timeoutHelper); |
| } |
| } |
| } |