blob: 7738fa0f91acb8ca3e0f995597a197687c5ad698 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.client.thin;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.client.ClientAddressFinder;
import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
* Class test ReliableChannel channels re-initialization.
public class ReliableChannelTest {
/** Mock factory for creating new channels. */
private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory =
(cfg, hnd) -> new TestClientChannel();
/** */
private final String[] dfltAddrs = new String[]{"", "", ""};
* Checks that it is possible configure addresses with duplication (for load balancing).
public void testDuplicatedAddressesAreValid() {
ClientConfiguration ccfg = new ClientConfiguration().setAddresses(
"", "", "");
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
assertEquals(3, rc.getChannelHolders().size());
* Checks that in case if address specified without port, the default port will be processed first
public void testAddressWithoutPort() {
ClientConfiguration ccfg = new ClientConfiguration().setAddresses("");
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size());
assertEquals(0, rc.getCurrentChannelIndex());
* Checks that ReliableChannel chooses random address as default from the set of addresses with the same (minimal) port.
public void testDefaultChannelBalancing() {
assertEquals(new HashSet<>(F.asList("", "", "")),
usedDefaultChannels("", "", "", ""));
assertEquals(new HashSet<>(F.asList("", "", "", "")),
usedDefaultChannels("", "", "", ""));
/** */
private Set<String> usedDefaultChannels(String... addrs) {
ClientConfiguration ccfg = new ClientConfiguration().setAddresses(addrs);
Set<String> usedChannels = new HashSet<>();
for (int i = 0; i < 100; i++) {
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
InetSocketAddress addr = F.first(rc.getChannelHolders().get(rc.getCurrentChannelIndex()).getAddresses());
usedChannels.add(addr.toString().replace("/<unresolved>", "")); // Remove unnecessary part on JDK 17.
return usedChannels;
* Checks that reinitialization of duplicated address is correct.
public void testReinitDuplicatedAddress() {
TestAddressFinder finder = new TestAddressFinder()
.nextAddresesResponse("", "", "")
.nextAddresesResponse("", "", "")
.nextAddresesResponse("", "", "")
.nextAddresesResponse("", "", "")
.nextAddresesResponse("", "", "")
.nextAddresesResponse("", "", "")
.nextAddresesResponse("", "", "")
.nextAddresesResponse("", "", "");
ClientConfiguration ccfg = new ClientConfiguration().setAddressesFinder(finder);
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
Supplier<List<String>> holderAddrs = () -> rc.getChannelHolders().stream()
.map(h -> F.first(h.getAddresses()).toString().replace("/<unresolved>", "")) // Replace unnecessary part on JDK 17.
Consumer<List<String>> assertAddrReInitAndEqualsTo = (addrs) -> {
assertEquals(addrs, holderAddrs.get());
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
assertAddrReInitAndEqualsTo.accept(Arrays.asList("", "", ""));
* Checks that channel holders are not reinited for static address configuration.
public void testChannelsNotReinitForStaticAddressConfiguration() {
ClientConfiguration ccfg = new ClientConfiguration().setAddresses(dfltAddrs);
* Checks that channel holders are not reinited if address finder return the same list of addresses.
public void testChannelsNotReinitForStableDynamicAddressConfiguration() {
TestAddressFinder finder = new TestAddressFinder()
.nextAddresesResponse("", "", "");
ClientConfiguration ccfg = new ClientConfiguration().setAddressesFinder(finder);
/** */
private void checkDoesNotReinit(ClientConfiguration ccfg) {
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
List<ReliableChannel.ClientChannelHolder> originalChannels = rc.getChannelHolders();
List<ReliableChannel.ClientChannelHolder> copyOriginalChannels = new ArrayList<>(originalChannels);
// Imitate topology change.
List<ReliableChannel.ClientChannelHolder> newChannels = rc.getChannelHolders();
assertSame(originalChannels, newChannels);
for (int i = 0; i < 3; ++i) {
assertSame(copyOriginalChannels.get(i), newChannels.get(i));
assertEquals(3, newChannels.size());
* Checks that node channels are persisted if channels are reinit with static address configuration.
public void testNodeChannelsAreNotCleaned() {
ClientConfiguration ccfg = new ClientConfiguration()
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
// Trigger TestClientChannel creation.
rc.service(null, null, null);
assertEquals(1, rc.getNodeChannels().size());
// Imitate topology change.
assertEquals(1, rc.getNodeChannels().size());
* Checks that channels are changed (add new, remove old) and close channels if reinitialization performed.
public void testDynamicAddressReinitializedCorrectly() {
TestAddressFinder finder = new TestAddressFinder()
.nextAddresesResponse("", "");
ClientConfiguration ccfg = new ClientConfiguration().setAddressesFinder(finder);
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
List<ReliableChannel.ClientChannelHolder> originChannels = Collections.unmodifiableList(rc.getChannelHolders());
// Imitate topology change.
assertEquals(2, F.size(originChannels, ReliableChannel.ClientChannelHolder::isClosed));
List<ReliableChannel.ClientChannelHolder> reuseChannel =
.filter(c -> !c.isClosed())
assertEquals(1, reuseChannel.size());
List<ReliableChannel.ClientChannelHolder> newChannels = rc.getChannelHolders();
assertEquals(2, newChannels.size());
assertTrue(newChannels.get(0) == reuseChannel.get(0) || newChannels.get(1) == reuseChannel.get(0));
newChannels.forEach(c -> assertFalse(c.isClosed()));
* Check that node channels are cleaned in case of full reinitialization.
public void testThatNodeChannelsCleanFullReinitialization() {
TestAddressFinder finder = new TestAddressFinder()
.nextAddresesResponse("", "");
ClientConfiguration ccfg = new ClientConfiguration()
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
// Trigger TestClientChannel creation.
rc.service(null, null, null);
assertEquals(1, rc.getNodeChannels().size());
// Imitate topology change.
assertEquals(0, rc.getNodeChannels().size());
* Should fail if default channel is not initialized.
@Test(expected = TestChannelException.class)
public void testFailOnInitIfDefaultChannelFailed() {
ClientConfiguration ccfg = new ClientConfiguration()
ReliableChannel rc = new ReliableChannel((cfg, hnd) -> new TestFailureClientChannel(), ccfg, null);
* Async operation should fail if cluster is down after send operation.
public void testFailOnAsyncAfterSendOperation() {
checkFailAfterSendOperation(cache -> {
try {
catch (Exception e) {
throw new RuntimeException(e);
}, false);
* Async operation should fail if cluster is down after send operation and handle topology change.
public void testFailOnAsyncTopologyChangeAfterSendOperation() {
checkFailAfterSendOperation(cache -> {
try {
catch (Exception e) {
throw new RuntimeException(e);
}, true);
/** */
private void checkFailAfterSendOperation(Consumer<TcpClientCache> op, boolean channelsReinitOnFail) {
ClientConfiguration ccfg = new ClientConfiguration()
// Emulate cluster is down after TcpClientChannel#send operation.
AtomicInteger step = new AtomicInteger();
ReliableChannel rc = new ReliableChannel((cfg, hnd) -> {
if (step.getAndIncrement() == 0)
return new TestAsyncServiceFailureClientChannel();
return new TestFailureClientChannel();
}, ccfg, null);
ClientBinaryMarshaller marsh = mock(ClientBinaryMarshaller.class);
TcpClientTransactions transactions = mock(TcpClientTransactions.class);
TcpClientCache cache = new TcpClientCache("", rc, marsh, transactions, null, false, null);
GridTestUtils.assertThrowsWithCause(() -> op.accept(cache), TestChannelException.class);
* Mock for client channel.
private static class TestClientChannel implements ClientChannel {
/** */
private final UUID serverNodeId = UUID.randomUUID();
/** {@inheritDoc} */
@Override public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader)
throws ClientException, ClientAuthorizationException, ClientServerError, ClientConnectionException {
return null;
/** {@inheritDoc} */
@Override public <T> CompletableFuture<T> serviceAsync(
ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader) {
return null;
/** {@inheritDoc} */
@Override public ProtocolContext protocolCtx() {
return new ProtocolContext(ProtocolVersion.LATEST_VER, null);
/** {@inheritDoc} */
@Override public UUID serverNodeId() {
return serverNodeId;
/** {@inheritDoc} */
@Override public AffinityTopologyVersion serverTopologyVersion() {
return null;
/** {@inheritDoc} */
@Override public void addTopologyChangeListener(Consumer<ClientChannel> lsnr) {
/* No-op */
/** {@inheritDoc} */
@Override public void addNotificationListener(ClientNotificationType type, Long rsrcId,
NotificationListener lsnr) {
/* No-op */
/** {@inheritDoc} */
@Override public void removeNotificationListener(ClientNotificationType type, Long rsrcId) {
/* No-op */
/** {@inheritDoc} */
@Override public boolean closed() {
return false;
/** {@inheritDoc} */
@Override public void close() throws Exception {
/* No-op */
* Mock client channel that fails on initialization.
private static class TestFailureClientChannel extends TestClientChannel {
/** Constructor that fails. */
private TestFailureClientChannel() {
throw new TestChannelException();
* Mock client channel that fails on initialization.
private static class TestAsyncServiceFailureClientChannel extends TestClientChannel {
/** {@inheritDoc} */
@Override public <T> CompletableFuture<T> serviceAsync(ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader) {
// Emulate that TcpClientChannel#send is ok, but TcpClientChannel#recieve is failed.
CompletableFuture<T> fut = new CompletableFuture<>();
fut.completeExceptionally(new ClientConnectionException(null));
return fut;
* TestFailureClientChannel failed with this Exception.
private static class TestChannelException extends RuntimeException {}
* Mock for address finder.
private static class TestAddressFinder implements ClientAddressFinder {
/** Queue of list addresses. Every new request poll this queue. */
private final Queue<String[]> addrResQueue;
/** */
private TestAddressFinder() {
addrResQueue = new LinkedList<>();
* Configure result for every next {@link #getAddresses()} request.
private TestAddressFinder nextAddresesResponse(String... addrs) {
return this;
/** {@inheritDoc} */
@Override public String[] getAddresses() {
if (addrResQueue.isEmpty())
throw new IllegalStateException("Server address request is not expected.");
return addrResQueue.poll();