blob: d7611842ab89b8e9a5befffb0e953f2c34a2786f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.thin;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.client.ClientAddressFinder;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.junit.Test;
import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
import static org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
/**
* Test partition awareness of thin client on changed topology.
*/
public class ThinClientPartitionAwarenessDiscoveryTest extends ThinClientAbstractPartitionAwarenessTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/**
* Test that client use channels to all running nodes while new nodes start.
*/
@Test
public void testClientDiscoveryNodesJoin() throws Exception {
for (int i = 0; i < MAX_CLUSTER_SIZE; ++i) {
startGrid(i);
awaitPartitionMapExchange();
int[] workChannels = IntStream.rangeClosed(0, i).toArray();
if (i == 0)
initClient(getClientConfigurationWithDiscovery(), workChannels);
else {
detectTopologyChange();
awaitChannelsInit(workChannels);
}
testPartitionAwareness(workChannels);
}
}
/**
* Test that client use channels to all running nodes while nodes stop.
*/
@Test
public void testClientDiscoveryNodesLeave() throws Exception {
startGrids(MAX_CLUSTER_SIZE);
awaitPartitionMapExchange();
initClient(getClientConfigurationWithDiscovery(), 0, 1, 2, 3);
detectTopologyChange();
for (int i = MAX_CLUSTER_SIZE - 1; i != 0; i--) {
int[] workChannels = IntStream.range(0, i).toArray();
channels[i] = null;
stopGrid(i);
awaitPartitionMapExchange();
detectTopologyChange();
testPartitionAwareness(workChannels);
}
}
/**
* Test that client use channels to configured nodes only while more nodes run.
*/
@Test
public void testClientDiscoveryFilterNodeJoin() throws Exception {
startGrids(MAX_CLUSTER_SIZE - 1);
awaitPartitionMapExchange();
initClient(getClientConfigurationWithDiscovery(3), 0, 1, 2);
startGrid(3);
awaitPartitionMapExchange();
detectTopologyChange();
testPartitionAwareness(0, 1, 2);
}
/**
* Checks that each request goes to right node.
*/
private void testPartitionAwareness(int... chIdxs) {
ClientCache<Object, Object> clientCache = client.cache(PART_CACHE_NAME);
IgniteInternalCache<Object, Object> igniteCache = grid(0).context().cache().cache(PART_CACHE_NAME);
Map<TestTcpClientChannel, Boolean> channelHits = Arrays.stream(chIdxs).boxed()
.collect(Collectors.toMap(idx -> channels[idx], idx -> false));
for (int i = 0; i < KEY_CNT; i++) {
TestTcpClientChannel opCh = affinityChannel(i, igniteCache);
clientCache.put(i, i);
if (i == 0)
assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
assertTrue(channelHits.containsKey(opCh));
channelHits.compute(opCh, (c, old) -> true);
}
assertFalse(channelHits.containsValue(false));
}
/**
* Provide ClientConfiguration with addrResolver that find all alive nodes.
*/
private ClientConfiguration getClientConfigurationWithDiscovery(int... excludeIdx) {
Set<Integer> exclude = Arrays.stream(excludeIdx).boxed().collect(Collectors.toSet());
ClientAddressFinder addrFinder = () ->
IgnitionEx.allGrids().stream().map(node -> {
int port = (Integer)node.cluster().localNode().attributes().get(CLIENT_LISTENER_PORT);
if (exclude.contains(port - DFLT_PORT))
return null;
return "127.0.0.1:" + port;
})
.filter(Objects::nonNull)
.toArray(String[]::new);
return new ClientConfiguration()
.setAddressesFinder(addrFinder)
.setPartitionAwarenessEnabled(true);
}
/**
* Trigger client to detect topology change.
*/
private void detectTopologyChange() {
// Send non-affinity request to detect topology change.
initDefaultChannel();
}
}