blob: 5d8c34c8bb95e29258fd96734a24871d6a3e40eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.thin;
import java.util.Arrays;
import java.util.Collection;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
/**
* Abstract thin client partition awareness test.
*/
@SuppressWarnings("rawtypes")
public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommonAbstractTest {
/** Wait timeout. */
private static final long WAIT_TIMEOUT = 5_000L;
/** Replicated cache name. */
protected static final String REPL_CACHE_NAME = "replicated_cache";
/** Partitioned cache name. */
protected static final String PART_CACHE_NAME = "partitioned_cache";
/** Partitioned with custom affinity cache name. */
protected static final String PART_CUSTOM_AFFINITY_CACHE_NAME = "partitioned_custom_affinity_cache";
/** Name of a partitioned cache with 0 backups. */
protected static final String PART_CACHE_0_BACKUPS_NAME = "partitioned_0_backup_cache";
/** Name of a partitioned cache with 1 backups. */
protected static final String PART_CACHE_1_BACKUPS_NAME = "partitioned_1_backup_cache";
/** Name of a partitioned cache with 3 backups. */
protected static final String PART_CACHE_3_BACKUPS_NAME = "partitioned_3_backup_cache";
/** Keys count. */
protected static final int KEY_CNT = 30;
/** Max cluster size. */
protected static final int MAX_CLUSTER_SIZE = 4;
/** Channels. */
protected final TestTcpClientChannel[] channels = new TestTcpClientChannel[MAX_CLUSTER_SIZE];
/** Operations queue. */
protected final Queue<T2<TestTcpClientChannel, ClientOperation>> opsQueue = new ConcurrentLinkedQueue<>();
/** Default channel. */
protected TestTcpClientChannel dfltCh;
/** Client instance. */
protected IgniteClient client;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setConsistentId(igniteInstanceName);
CacheConfiguration ccfg0 = new CacheConfiguration()
.setName(REPL_CACHE_NAME)
.setCacheMode(CacheMode.REPLICATED);
CacheConfiguration ccfg1 = new CacheConfiguration()
.setName(PART_CUSTOM_AFFINITY_CACHE_NAME)
.setCacheMode(CacheMode.PARTITIONED)
.setAffinity(new CustomAffinityFunction());
CacheConfiguration ccfg2 = new CacheConfiguration()
.setName(PART_CACHE_NAME)
.setCacheMode(CacheMode.PARTITIONED)
.setKeyConfiguration(
new CacheKeyConfiguration(TestNotAnnotatedAffinityKey.class.getName(), "affinityKey"),
new CacheKeyConfiguration(TestAnnotatedAffinityKey.class));
CacheConfiguration ccfg3 = new CacheConfiguration()
.setName(PART_CACHE_0_BACKUPS_NAME)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(0);
CacheConfiguration ccfg4 = new CacheConfiguration()
.setName(PART_CACHE_1_BACKUPS_NAME)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(1);
CacheConfiguration ccfg5 = new CacheConfiguration()
.setName(PART_CACHE_3_BACKUPS_NAME)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(3);
return cfg.setCacheConfiguration(ccfg0, ccfg1, ccfg2, ccfg3, ccfg4, ccfg5);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
opsQueue.clear();
U.closeQuiet(client);
client = null;
}
/**
* Checks that operation goes through specified channel.
*/
protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation expOp) {
T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp);
assertEquals("Unexpected channel for opertation [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
}
/**
* Calculates affinity channel for cache and key.
*/
protected TestTcpClientChannel affinityChannel(Object key, IgniteInternalCache<Object, Object> cache) {
Collection<ClusterNode> nodes = cache.affinity().mapKeyToPrimaryAndBackups(key);
UUID nodeId = nodes.iterator().next().id();
return nodeChannel(nodeId);
}
/**
* Calculates channel for node.
*/
protected TestTcpClientChannel nodeChannel(UUID nodeId) {
for (int i = 0; i < channels.length; i++) {
if (channels[i] != null && nodeId.equals(channels[i].serverNodeId()))
return channels[i];
}
return dfltCh;
}
/**
* @param nodeIdxs Node indexes to connect to.
*/
protected ClientConfiguration getClientConfiguration(int... nodeIdxs) {
String addrs[] = Arrays.stream(nodeIdxs).mapToObj(nodeIdx -> "127.0.0.1:" + (DFLT_PORT + nodeIdx))
.toArray(String[]::new);
return new ClientConfiguration().setAddresses(addrs).setPartitionAwarenessEnabled(true);
}
/**
* @param clientCfg Client configuration.
* @param chIdxs Channels to wait for initialization.
*/
protected void initClient(ClientConfiguration clientCfg, int... chIdxs) throws IgniteInterruptedCheckedException {
client = new TcpIgniteClient((cfg, hnd) -> {
try {
log.info("Establishing connection to " + cfg.getAddress());
TcpClientChannel ch = new TestTcpClientChannel(cfg, hnd);
log.info("Channel initialized: " + ch);
return ch;
}
catch (Exception e) {
log.warning("Failed to initialize channel: " + e.getMessage());
throw e;
}
}, clientCfg);
awaitChannelsInit(chIdxs);
initDefaultChannel();
}
/**
*
*/
protected void initDefaultChannel() {
opsQueue.clear();
// Send non-affinity request to determine default channel.
client.getOrCreateCache(REPL_CACHE_NAME);
T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
assertNotNull(nextChOp);
assertEquals(nextChOp.get2(), ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
dfltCh = nextChOp.get1();
}
/**
* @param chIdxs Channel idxs.
*/
protected void awaitChannelsInit(int... chIdxs) throws IgniteInterruptedCheckedException {
// Wait until all channels initialized.
for (int ch : chIdxs) {
assertTrue("Failed to wait for channel[" + ch + "] init",
GridTestUtils.waitForCondition(() -> channels[ch] != null, WAIT_TIMEOUT));
}
}
/**
*
*/
private static class CustomAffinityFunction extends RendezvousAffinityFunction {
// No-op.
}
/**
* Test class without affinity key.
*/
protected static class TestComplexKey {
/** */
int firstField;
/** Another field. */
int secondField;
/** */
public TestComplexKey(int firstField, int secondField) {
this.firstField = firstField;
this.secondField = secondField;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return firstField + secondField;
}
}
/**
* Test class with annotated affinity key.
*/
protected static class TestAnnotatedAffinityKey {
/** */
@AffinityKeyMapped
int affinityKey;
/** */
int anotherField;
/** */
public TestAnnotatedAffinityKey(int affinityKey, int anotherField) {
this.affinityKey = affinityKey;
this.anotherField = anotherField;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return affinityKey + anotherField;
}
}
/**
* Test class with affinity key without annotation.
*/
protected static class TestNotAnnotatedAffinityKey {
/** */
TestComplexKey affinityKey;
/** */
int anotherField;
/** */
public TestNotAnnotatedAffinityKey(TestComplexKey affinityKey, int anotherField) {
this.affinityKey = affinityKey;
this.anotherField = anotherField;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return affinityKey.hashCode() + anotherField;
}
}
/**
* Test TCP client channel.
*/
protected class TestTcpClientChannel extends TcpClientChannel {
/** Channel configuration. */
private final ClientChannelConfiguration cfg;
/** Channel is closed. */
private volatile boolean closed;
/**
* @param cfg Config.
*/
public TestTcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer hnd) {
super(cfg, hnd);
this.cfg = cfg;
int chIdx = cfg.getAddress().getPort() - DFLT_PORT;
channels[chIdx] = this;
addTopologyChangeListener(ch -> log.info("Topology change detected [ch=" + ch + ", topVer=" +
ch.serverTopologyVersion() + ']'));
}
/** {@inheritDoc} */
@Override public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader) throws ClientException {
T res = super.service(op, payloadWriter, payloadReader);
// Store all operations except binary type registration in queue to check later.
if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME && op != ClientOperation.PUT_BINARY_TYPE)
opsQueue.offer(new T2<>(this, op));
return res;
}
/** {@inheritDoc} */
@Override public <T> CompletableFuture<T> serviceAsync(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader)
throws ClientException {
// Store all operations except binary type registration in queue to check later.
if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME && op != ClientOperation.PUT_BINARY_TYPE)
opsQueue.offer(new T2<>(this, op));
return super.serviceAsync(op, payloadWriter, payloadReader);
}
/** {@inheritDoc} */
@Override public void close() {
super.close();
closed = true;
}
/**
* Channel is closed.
*/
public boolean isClosed() {
return closed;
}
/** {@inheritDoc} */
@Override public String toString() {
return cfg.getAddress().toString();
}
}
}