| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.client.thin; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.client.ClientAuthenticationException; |
| import org.apache.ignite.client.ClientClusterGroup; |
| import org.apache.ignite.client.ClientConnectionException; |
| import org.apache.ignite.client.ClientException; |
| import org.apache.ignite.client.ClientServices; |
| import org.apache.ignite.client.IgniteClient; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.ClientConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.GridJobExecuteRequest; |
| import org.apache.ignite.internal.GridTopic; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; |
| import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.service.GridServiceProxy; |
| import org.apache.ignite.internal.processors.service.ServiceClusterDeploymentResultBatch; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.services.ServiceConfiguration; |
| import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.testframework.ListeningTestLogger; |
| import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger; |
| import org.apache.logging.log4j.Level; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.testframework.GridTestUtils.assertThrows; |
| import static org.apache.ignite.testframework.GridTestUtils.runAsync; |
| import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync; |
| import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; |
| |
| /** |
| * Checks the service awareness feature of the thin client. |
| */ |
| public class ServiceAwarenessTest extends AbstractThinClientTest { |
| /** Node-filter service name. */ |
| private static final String SRV_NAME = "node_filtered_svc"; |
| |
| /** Number of grids at the test start. */ |
| private static final int GRIDS = 4; |
| |
| /** Number of node instances with the initial service deployment. */ |
| private static final int INIT_SRVC_NODES_CNT = 2; |
| |
| /** */ |
| protected boolean partitionAwareness = true; |
| |
| /** */ |
| private static ListeningTestLogger clientLogLsnr; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| cfg.setDiscoverySpi(new TestBlockingDiscoverySpi()); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected ClientConfiguration getClientConfiguration() { |
| ClientConfiguration ccfg = super.getClientConfiguration(); |
| |
| ccfg.setLogger(clientLogLsnr); |
| |
| return ccfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| super.beforeTestsStarted(); |
| |
| clientLogLsnr = new ListeningTestLogger(log); |
| } |
| |
| /** */ |
| private static ServiceConfiguration serviceCfg() { |
| // Service is deployed on nodes with the name index equal to 1, 2 or >= GRIDS. |
| return new ServiceConfiguration() |
| .setName(SRV_NAME) |
| .setService(new ServicesTest.TestService()) |
| .setMaxPerNodeCount(1) |
| .setNodeFilter(new TestNodeFilter()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean isClientPartitionAwarenessEnabled() { |
| return partitionAwareness; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| super.afterTest(); |
| |
| ((GridTestLog4jLogger)log).setLevel(Level.INFO); |
| |
| stopAllGrids(); |
| |
| partitionAwareness = true; |
| |
| clientLogLsnr.clearListeners(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| startGrids(GRIDS); |
| |
| grid(1).services().deploy(serviceCfg()); |
| } |
| |
| /** */ |
| @Test |
| public void testDelayedServiceRedeploy() throws Exception { |
| TestBlockingDiscoverySpi testDisco = ((TestBlockingDiscoverySpi)grid(0).configuration().getDiscoverySpi()); |
| |
| // Service topology on the client. |
| Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>(); |
| |
| addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll); |
| |
| AtomicBoolean svcRunFlag = new AtomicBoolean(true); |
| |
| try (IgniteClient client = startClient()) { |
| ServicesTest.TestServiceInterface svc = client.services().serviceProxy(SRV_NAME, ServicesTest.TestServiceInterface.class); |
| |
| runAsync(() -> { |
| while (svcRunFlag.get()) |
| svc.testMethod(); |
| }); |
| |
| waitForCondition(() -> srvcTopOnClient.size() == INIT_SRVC_NODES_CNT |
| && srvcTopOnClient.contains(grid(1).localNode().id()) |
| && srvcTopOnClient.contains(grid(2).localNode().id()), |
| getTestTimeout()); |
| |
| // Delays service redeployment and the service topology update on the server side. |
| testDisco.toBlock.add(ServiceClusterDeploymentResultBatch.class); |
| |
| startGrid(GRIDS); |
| |
| waitForCondition(() -> testDisco.blocked.size() == 1, getTestTimeout()); |
| |
| // Ensure all the nodes have started but the service topology hasn't updated yet. |
| for (Ignite ig : G.allGrids()) { |
| assertEquals(ig.cluster().nodes().size(), GRIDS + 1); |
| |
| // Ensure there are still SRVC_FILTERED_NOIDES_CNT nodes with the service instance. |
| assertEquals(((IgniteEx)ig).context().service().serviceTopology(SRV_NAME, 0).size(), |
| INIT_SRVC_NODES_CNT); |
| } |
| |
| // Ensure the client's topology is not updated. |
| assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT |
| && !srvcTopOnClient.contains(grid(GRIDS).localNode().id())); |
| |
| testDisco.release(); |
| |
| // Ensure the service topology has been updated to 3 instances per cluster. |
| for (Ignite ig : G.allGrids()) { |
| waitForCondition( |
| () -> { |
| try { |
| return ((IgniteEx)ig).context().service().serviceTopology(SRV_NAME, 0).size() == 3; |
| } |
| catch (Exception e) { |
| return false; |
| } |
| }, |
| getTestTimeout() |
| ); |
| } |
| |
| waitForCondition(() -> srvcTopOnClient.size() == 3 && srvcTopOnClient.contains(grid(1).localNode().id()) |
| && srvcTopOnClient.contains(grid(2).localNode().id()) |
| && srvcTopOnClient.contains(grid(GRIDS).localNode().id()), getTestTimeout()); |
| } |
| finally { |
| svcRunFlag.set(false); |
| } |
| } |
| |
| /** |
| * Tests several nodes come while one thread is used to call the service. |
| */ |
| @Test |
| public void testNodesJoinSingleThreaded() throws Exception { |
| doTestClusterTopChangesWhileServiceCalling(3, true, false); |
| } |
| |
| /** |
| * Tests several nodes come while several threads are used to call the service. |
| */ |
| @Test |
| public void testNodesJoinMultiThreaded() throws Exception { |
| doTestClusterTopChangesWhileServiceCalling(3, true, true); |
| } |
| |
| /** |
| * Tests several nodes leaves while one thread is used to call the service. |
| */ |
| @Test |
| public void testNodesLeaveSingleThreaded() throws Exception { |
| doTestClusterTopChangesWhileServiceCalling(3, false, false); |
| } |
| |
| /** |
| * Tests several nodes leave while several threads are used to call the service. |
| */ |
| @Test |
| public void testNodesLeaveMultiThreaded() throws Exception { |
| doTestClusterTopChangesWhileServiceCalling(3, false, true); |
| } |
| |
| /** |
| * Tests change of the minor cluster topology version doesn't trigger the service topology update. |
| */ |
| @Test |
| public void testMinorTopologyVersionDoesntAffect() throws Exception { |
| try (IgniteClient client = startClient()) { |
| ServicesTest.TestServiceInterface svc = client.services().serviceProxy(SRV_NAME, ServicesTest.TestServiceInterface.class); |
| |
| Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>(); |
| |
| addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll); |
| |
| ((GridTestLog4jLogger)log).setLevel(Level.DEBUG); |
| |
| while (srvcTopOnClient.isEmpty()) |
| svc.testMethod(); |
| |
| // Last time ot the topology update. |
| long time = System.nanoTime(); |
| |
| srvcTopOnClient.clear(); |
| |
| AffinityTopologyVersion prevTopVer = grid(0).context().discovery().topologyVersionEx(); |
| |
| grid(0).createCache("testCache"); |
| |
| awaitPartitionMapExchange(); |
| |
| AffinityTopologyVersion newTopVer = grid(0).context().discovery().topologyVersionEx(); |
| |
| assertTrue(newTopVer.topologyVersion() == prevTopVer.topologyVersion() |
| && newTopVer.minorTopologyVersion() > prevTopVer.minorTopologyVersion()); |
| |
| while (srvcTopOnClient.isEmpty()) |
| svc.testMethod(); |
| |
| // Update only by the timeout. |
| assertTrue(U.nanosToMillis(System.nanoTime() - time) > ClientServicesImpl.SRV_TOP_UPDATE_PERIOD / 2); |
| } |
| } |
| |
| /** |
| * Tests the service topology update with a gap of service invocation during forced service redeployment. |
| */ |
| @Test |
| public void testForcedServiceRedeployWhileClientIsIdle() throws Exception { |
| try (IgniteClient client = startClient()) { |
| ServicesTest.TestServiceInterface svc = client.services().serviceProxy(SRV_NAME, ServicesTest.TestServiceInterface.class); |
| |
| Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>(); |
| |
| addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll); |
| |
| ((GridTestLog4jLogger)log).setLevel(Level.DEBUG); |
| |
| assertTrue(waitForCondition(() -> { |
| svc.testMethod(); |
| |
| return !srvcTopOnClient.isEmpty(); |
| }, ClientServicesImpl.SRV_TOP_UPDATE_PERIOD)); |
| |
| ((GridTestLog4jLogger)log).setLevel(Level.INFO); |
| |
| assertTrue(srvcTopOnClient.size() == INIT_SRVC_NODES_CNT |
| && srvcTopOnClient.contains(grid(1).localNode().id()) |
| && srvcTopOnClient.contains(grid(2).localNode().id())); |
| |
| long prevTopVer = grid(0).context().discovery().topologyVersion(); |
| |
| grid(1).services().cancel(SRV_NAME); |
| |
| waitForCondition(() -> grid(0).services().serviceDescriptors().isEmpty(), getTestTimeout()); |
| |
| srvcTopOnClient.clear(); |
| |
| grid(1).services().deploy(serviceCfg().setNodeFilter(null)); |
| |
| waitForCondition(() -> !grid(0).services().serviceDescriptors().isEmpty(), getTestTimeout()); |
| |
| assertEquals(prevTopVer, grid(0).context().discovery().topologyVersion()); |
| |
| ((GridTestLog4jLogger)log).setLevel(Level.DEBUG); |
| |
| waitForCondition(() -> { |
| svc.testMethod(); |
| |
| return srvcTopOnClient.size() == GRIDS; |
| }, getTestTimeout()); |
| |
| for (Ignite ig : G.allGrids()) |
| assertTrue(srvcTopOnClient.contains(ig.cluster().localNode().id())); |
| } |
| } |
| |
| /** */ |
| private void doTestClusterTopChangesWhileServiceCalling( |
| int nodesCnt, |
| boolean addNodes, |
| boolean multiThreaded) |
| throws Exception { |
| assert nodesCnt > 0; |
| |
| Set<UUID> newNodesUUIDs = new GridConcurrentHashSet<>(); |
| |
| // Start additional nodes to stop them. |
| if (!addNodes) { |
| startGridsMultiThreaded(GRIDS, nodesCnt); |
| |
| for (int i = GRIDS; i < GRIDS + nodesCnt; ++i) |
| newNodesUUIDs.add(grid(i).localNode().id()); |
| } |
| |
| // Service topology on the clients. |
| Set<UUID> srvcTopOnClient = new GridConcurrentHashSet<>(); |
| |
| addSrvcTopUpdateClientLogLsnr(srvcTopOnClient::addAll); |
| |
| AtomicBoolean changeClusterTop = new AtomicBoolean(); |
| AtomicBoolean stopFlag = new AtomicBoolean(); |
| |
| try (IgniteClient client = startClient()) { |
| ServicesTest.TestServiceInterface svc = client.services().serviceProxy(SRV_NAME, ServicesTest.TestServiceInterface.class); |
| |
| ((GridTestLog4jLogger)log).setLevel(Level.DEBUG); |
| |
| IgniteInternalFuture<?> runFut = runMultiThreadedAsync(() -> { |
| do { |
| try { |
| svc.testMethod(); |
| } |
| catch (ClientException e) { |
| String m = e.getMessage(); |
| |
| // TODO: IGNITE-20802 : Exception should not occur. |
| // Client doesn't retry service invocation if the redirected-to service instance node leaves cluster. |
| if (addNodes || (!m.contains("Node has left grid") && !m.contains("Failed to send job due to node failure")) |
| || newNodesUUIDs.stream().noneMatch(nid -> m.contains(nid.toString()))) |
| throw e; |
| } |
| } |
| while (!stopFlag.get()); |
| }, multiThreaded ? 4 : 1, "ServiceTestLoader"); |
| |
| while (!stopFlag.get()) { |
| // Wait until the initial topology is received. |
| if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT : INIT_SRVC_NODES_CNT + nodesCnt) |
| && changeClusterTop.compareAndSet(false, true)) { |
| srvcTopOnClient.clear(); |
| |
| for (int i = 0; i < nodesCnt; ++i) { |
| int nodeIdx = GRIDS + i; |
| |
| runAsync(() -> { |
| try { |
| if (addNodes) |
| newNodesUUIDs.add(startGrid(nodeIdx).localNode().id()); |
| else |
| stopGrid(nodeIdx); |
| } |
| catch (Exception e) { |
| log.error("Unable to start or stop test grid.", e); |
| |
| stopFlag.set(true); |
| } |
| }); |
| } |
| } |
| |
| // Stop if new excepted service topology received. |
| if (srvcTopOnClient.size() == (addNodes ? INIT_SRVC_NODES_CNT + nodesCnt : INIT_SRVC_NODES_CNT)) |
| stopFlag.set(true); |
| |
| Thread.sleep(10); |
| } |
| |
| runFut.get(); |
| } |
| |
| // The initial nodes must always persist it the service topology. |
| assertTrue(srvcTopOnClient.contains(grid(1).localNode().id()) |
| && srvcTopOnClient.contains(grid(2).localNode().id())); |
| |
| assertEquals(addNodes ? nodesCnt : 0, newNodesUUIDs.stream().filter(srvcTopOnClient::contains).count()); |
| } |
| |
| /** |
| * Tests that the client invokes only the proper nodes when partitionAwareness is enabled and no |
| * {@link ClientClusterGroup} is set. |
| */ |
| @Test |
| public void testWithNoSubCluster() { |
| doTestServiceAwarenessForClusterGroup(null); |
| } |
| |
| /** |
| * Tests that the client invokes only the proper node if partitionAwareness is enabled and just one correct server |
| * is passed as {@link ClientClusterGroup}. |
| */ |
| @Test |
| public void testWithOneCorrectServer() { |
| doTestServiceAwarenessForClusterGroup(Collections.singletonList(grid(1).localNode().id())); |
| } |
| |
| /** |
| * Tests that the client invokes only the proper nodes if partitionAwareness is enabled and just a couple of correct |
| * servers are passed as {@link ClientClusterGroup} to invoke the service on. |
| */ |
| @Test |
| public void testWithTwoCorrectServers() { |
| doTestServiceAwarenessForClusterGroup(Arrays.asList(grid(1).localNode().id(), grid(2).localNode().id())); |
| } |
| |
| /** |
| * Tests that the client invokes only the proper node if partitionAwareness is enabled and one correct |
| * server and one incorrect server (having no service instance) are passed as {@link ClientClusterGroup} to invoke |
| * the service on. |
| */ |
| @Test |
| public void testWithOneCorrectOneIncorrectServers() { |
| doTestServiceAwarenessForClusterGroup(Arrays.asList(grid(0).localNode().id(), grid(2).localNode().id())); |
| } |
| |
| /** |
| * Tests that the client invokes only the proper node if partitionAwareness is enabled and only incorrect |
| * server (having no service instance) are passed as {@link ClientClusterGroup} to invoke the service on. |
| */ |
| @Test |
| public void testWithIncorrectServer() { |
| doTestServiceAwarenessForClusterGroup(Collections.singletonList(grid(0).localNode().id())); |
| } |
| |
| /** */ |
| private void doTestServiceAwarenessForClusterGroup(@Nullable Collection<UUID> grp) { |
| // Counters of the invocation redirects. |
| AtomicInteger redirectCnt = new AtomicInteger(); |
| |
| // Service topology received by the client. |
| Set<UUID> top = new GridConcurrentHashSet<>(); |
| |
| // Requested server nodes with a service invocation. |
| Collection<UUID> requestedServers = new GridConcurrentHashSet<>(); |
| |
| // All or properly filtered nodes with the service instances. |
| Set<UUID> filteredGrp = F.isEmpty(grp) |
| ? top |
| : grp.stream().filter(nid -> new TestNodeFilter().apply(grid(0).cluster().node(nid))).collect(Collectors.toSet()); |
| |
| addSrvcTopUpdateClientLogLsnr(uuids -> { |
| // Reset counters on the first topology update. |
| if (top.isEmpty()) |
| redirectCnt.set(0); |
| |
| top.addAll(uuids); |
| }); |
| |
| // Listener of the service remote call (the redirection). |
| G.allGrids().forEach(g -> ((IgniteEx)g).context().io().addMessageListener(GridTopic.TOPIC_JOB, new GridMessageListener() { |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| if (msg instanceof GridJobExecuteRequest |
| && ((GridJobExecuteRequest)msg).getTaskClassName().contains(GridServiceProxy.class.getName())) |
| redirectCnt.incrementAndGet(); |
| } |
| })); |
| |
| partitionAwareness = false; |
| |
| ((GridTestLog4jLogger)log).setLevel(Level.DEBUG); |
| |
| callService(requestedServers, grp, filteredGrp, null); |
| |
| // Check no service awareness: continous redirections. |
| assertEquals(F.isEmpty(filteredGrp) && !F.isEmpty(grp) ? 0 : 100, redirectCnt.get()); |
| |
| // Ensure that client received no service topology update. |
| assertTrue(top.isEmpty()); |
| |
| assertTrue(requestedServers.size() == 1 && requestedServers.contains(grid(0).localNode().id())); |
| |
| partitionAwareness = true; |
| |
| callService(requestedServers, grp, filteredGrp, svc -> { |
| // We assume that the topology will be received and used for the further requests. |
| redirectCnt.set(0); |
| requestedServers.clear(); |
| |
| for (int i = 0; i < 1000; ++i) |
| svc.testMethod(); |
| }); |
| |
| // Ensure that only the target nodes were requested after the topology getting. |
| if (!F.isEmpty(filteredGrp)) |
| assertEquals(filteredGrp, requestedServers); |
| |
| // Check the received topology. |
| assertFalse(top.retainAll(filteredGrp)); |
| |
| // Ensure there were no redirected sertvic calls any more. |
| assertEquals(0, redirectCnt.get()); |
| } |
| |
| /** |
| * Invokes the test service 100 times. If required, stores requestes server nodes. Excepts an exception if the |
| * target nodes have no service instance. |
| * |
| * @param requestedEndpoints If not {@code null}, is filled with actually requested nodes. |
| * @param svcGrp If not {@code null}, is used to pass as a nodes group to call the service on it. |
| * @param filteredSvcGrp If not {@code null}, actual nodes group with the service instances. |
| * @param afterCallAction If not {@code null}, is invoked after the service calls. |
| */ |
| private void callService( |
| @Nullable Collection<UUID> requestedEndpoints, |
| @Nullable Collection<UUID> svcGrp, |
| @Nullable Collection<UUID> filteredSvcGrp, |
| @Nullable Consumer<ServicesTest.TestServiceInterface> afterCallAction |
| ) { |
| try (IgniteClient client = startClient(requestedEndpoints)) { |
| ClientServices clientServices = F.isEmpty(svcGrp) |
| ? client.services() |
| : client.services(client.cluster().forNodeIds(svcGrp)); |
| |
| ServicesTest.TestServiceInterface svc = clientServices.serviceProxy(SRV_NAME, ServicesTest.TestServiceInterface.class); |
| |
| if (F.isEmpty(filteredSvcGrp) && !F.isEmpty(svcGrp)) |
| assertThrows(null, () -> svc.testMethod(), ClientException.class, "Failed to find deployed service:"); |
| else { |
| for (int i = 0; i < 100; ++i) |
| svc.testMethod(); |
| |
| if (afterCallAction != null) |
| afterCallAction.accept(svc); |
| } |
| } |
| } |
| |
| /** Extracts ids of received service instance nodes from the client log. */ |
| private static void addSrvcTopUpdateClientLogLsnr(Consumer<Set<UUID>> srvTopConsumer) { |
| clientLogLsnr.registerListener(s -> { |
| if (s.contains("Topology of service '" + SRV_NAME + "' has been updated. The service instance nodes: ")) { |
| String nodes = s.substring(s.lastIndexOf(": [") + 3, s.length() - 2); |
| |
| srvTopConsumer.accept(Arrays.stream(nodes.split(", ")).map(UUID::fromString).collect(Collectors.toSet())); |
| } |
| }); |
| } |
| |
| /** */ |
| private IgniteClient startClient() { |
| return new TcpIgniteClient((cfg, hnd) -> new TestTcpChannel(cfg, hnd, null), |
| getClientConfiguration(grid(0))); |
| } |
| |
| /** */ |
| private IgniteClient startClient(@Nullable Collection<UUID> requestedServerNodes) { |
| return new TcpIgniteClient((cfg, hnd) -> new TestTcpChannel(cfg, hnd, requestedServerNodes), |
| getClientConfiguration(grid(0))); |
| } |
| |
| /** |
| * Accepts nodes with the name index equal to 1, 2 or >= GRIDS. |
| */ |
| private static final class TestNodeFilter implements IgnitePredicate<ClusterNode> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean apply(ClusterNode node) { |
| String nodeName = node.attribute("org.apache.ignite.ignite.name"); |
| |
| if (F.isEmpty(nodeName)) |
| return false; |
| |
| int nodeIdx = -1; |
| |
| try { |
| nodeIdx = Integer.parseInt(nodeName.substring(nodeName.length() - 1)); |
| } |
| catch (Exception e) { |
| // No-op. |
| } |
| |
| return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= GRIDS; |
| } |
| } |
| |
| /** */ |
| private static final class TestBlockingDiscoverySpi extends TcpDiscoverySpi { |
| /** */ |
| private final Set<Class<? extends DiscoveryCustomMessage>> toBlock = new HashSet<>(); |
| |
| /** */ |
| private final List<CustomMessageWrapper> blocked = new CopyOnWriteArrayList<>(); |
| |
| /** {@inheritDoc} */ |
| @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { |
| if (msg instanceof CustomMessageWrapper |
| && toBlock.stream().anyMatch(mt -> mt.isAssignableFrom(((CustomMessageWrapper)msg).delegate().getClass()))) { |
| blocked.add((CustomMessageWrapper)msg); |
| |
| return; |
| } |
| |
| super.sendCustomEvent(msg); |
| } |
| |
| /** */ |
| public void release() { |
| toBlock.clear(); |
| |
| blocked.forEach(this::sendCustomEvent); |
| } |
| } |
| |
| /** |
| * A client connection channel able to register the server nodes requested to call a service. |
| */ |
| private static final class TestTcpChannel extends TcpClientChannel { |
| /** */ |
| private final @Nullable Collection<UUID> requestedServerNodes; |
| |
| /** Ctor. */ |
| private TestTcpChannel( |
| ClientChannelConfiguration cfg, |
| ClientConnectionMultiplexer connMgr, |
| @Nullable Collection<UUID> requestedServerNodes) |
| throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { |
| super(cfg, connMgr); |
| |
| this.requestedServerNodes = requestedServerNodes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, |
| Function<PayloadInputChannel, T> payloadReader) throws ClientException { |
| UUID srvNodeId = serverNodeId(); |
| |
| if (op == ClientOperation.SERVICE_INVOKE && requestedServerNodes != null && srvNodeId != null) |
| requestedServerNodes.add(srvNodeId); |
| |
| return super.service(op, payloadWriter, payloadReader); |
| } |
| } |
| } |