blob: f6c60f164dca369b1b3e7efefaad164705dc4a11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.client.IgniteClientAuthenticationException;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.IgniteClientException;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplexer;
/**
* Communication channel with failover and partition awareness.
*/
public final class ReliableChannel implements AutoCloseable {
/** Do nothing helper function. */
private static final Consumer<Integer> DO_NOTHING = (v) -> {};
/** Channel factory. */
private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory;
/** Client channel holders for each configured address. */
private volatile List<ClientChannelHolder> channels;
/** Index of the current channel. */
private volatile int curChIdx = -1;
/** Client configuration. */
private final IgniteClientConfiguration clientCfg;
/** Node channels. */
private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
/** Channel is closed. */
private volatile boolean closed;
/** Fail (disconnect) listeners. */
private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
/** Guard channels and curChIdx together. */
private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();
/** Connection manager. */
private final ClientConnectionMultiplexer connMgr;
/** Cache addresses returned by {@code ThinClientAddressFinder}. */
private volatile String[] prevHostAddrs;
/**
* Constructor.
*
* @param chFactory Channel factory.
* @param clientCfg Client config.
*/
ReliableChannel(BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
IgniteClientConfiguration clientCfg) {
if (chFactory == null)
throw new NullPointerException("chFactory");
if (clientCfg == null)
throw new NullPointerException("clientCfg");
this.clientCfg = clientCfg;
this.chFactory = chFactory;
connMgr = new NettyClientConnectionMultiplexer();
connMgr.start();
}
/** {@inheritDoc} */
@Override public synchronized void close() {
closed = true;
connMgr.stop();
List<ClientChannelHolder> holders = channels;
if (holders != null) {
for (ClientChannelHolder hld: holders)
hld.close();
}
}
/**
* Sends request and handles response asynchronously.
*
* @param opCode Operation code.
* @param payloadWriter Payload writer.
* @param payloadReader Payload reader.
* @param <T> response type.
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(
int opCode,
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader
) {
CompletableFuture<T> fut = new CompletableFuture<>();
// Use the only one attempt to avoid blocking async method.
handleServiceAsync(fut, opCode, payloadWriter, payloadReader, 1, null);
return fut;
}
private <T> void handleServiceAsync(final CompletableFuture<T> fut,
int opCode,
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader,
int attemptsLimit,
IgniteClientConnectionException failure) {
ClientChannel ch;
// Workaround to store used attempts value within lambda body.
var attemptsCnt = new int[1];
try {
ch = applyOnDefaultChannel(channel -> channel, attemptsLimit, v -> attemptsCnt[0] = v );
}
catch (Throwable ex) {
if (failure != null) {
failure.addSuppressed(ex);
fut.completeExceptionally(failure);
return;
}
fut.completeExceptionally(ex);
return;
}
ch
.serviceAsync(opCode, payloadWriter, payloadReader)
.handle((res, err) -> {
if (err == null) {
fut.complete(res);
return null;
}
IgniteClientConnectionException failure0 = failure;
if (err instanceof IgniteClientConnectionException) {
try {
// Will try to reinit channels if topology changed.
onChannelFailure(ch);
}
catch (Throwable ex) {
fut.completeExceptionally(ex);
return null;
}
if (failure0 == null)
failure0 = (IgniteClientConnectionException)err;
else
failure0.addSuppressed(err);
int leftAttempts = attemptsLimit - attemptsCnt[0];
// If it is a first retry then reset attempts (as for initialization we use only 1 attempt).
if (failure == null)
leftAttempts = getRetryLimit() - 1;
if (leftAttempts > 0) {
handleServiceAsync(fut, opCode, payloadWriter, payloadReader, leftAttempts, failure0);
return null;
}
}
else {
fut.completeExceptionally(err instanceof IgniteClientException
? err
: new IgniteClientException(err.getMessage(), err));
return null;
}
fut.completeExceptionally(failure0);
return null;
});
}
/**
* Sends request without payload and handles response asynchronously.
*
* @param opCode Operation code.
* @param payloadReader Payload reader.
* @param <T> Response type.
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T> payloadReader) {
return serviceAsync(opCode, null, payloadReader);
}
/**
* Sends request with payload and handles response asynchronously.
* @param opCode Operation code.
* @param payloadWriter Payload writer.
* @return Future for the operation.
*/
public CompletableFuture<Void> requestAsync(int opCode, PayloadWriter payloadWriter) {
return serviceAsync(opCode, payloadWriter, null);
}
/**
* @return host:port_range address lines parsed as {@link InetSocketAddress} as a key. Value is the amount of
* appearences of an address in {@code addrs} parameter.
*/
private static Map<InetSocketAddress, Integer> parsedAddresses(String[] addrs) throws IgniteClientException {
if (addrs == null || addrs.length == 0)
throw new IgniteClientException("Empty addresses");
Collection<HostAndPortRange> ranges = new ArrayList<>(addrs.length);
for (String a : addrs) {
ranges.add(HostAndPortRange.parse(
a,
IgniteClientConfiguration.DFLT_PORT,
IgniteClientConfiguration.DFLT_PORT + IgniteClientConfiguration.DFLT_PORT_RANGE,
"Failed to parse Ignite server address"
));
}
return ranges.stream()
.flatMap(r -> IntStream
.rangeClosed(r.portFrom(), r.portTo()).boxed()
.map(p -> InetSocketAddress.createUnresolved(r.host(), p))
)
.collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
}
/**
* Roll current default channel if specified holder equals to it.
*/
private void rollCurrentChannel(ClientChannelHolder hld) {
curChannelsGuard.writeLock().lock();
try {
int idx = curChIdx;
List<ClientChannelHolder> holders = channels;
ClientChannelHolder dfltHld = holders.get(idx);
if (dfltHld == hld) {
idx += 1;
if (idx >= holders.size())
curChIdx = 0;
else
curChIdx = idx;
}
}
finally {
curChannelsGuard.writeLock().unlock();
}
}
/**
* On current channel failure.
*/
private void onChannelFailure(ClientChannel ch) {
// There is nothing wrong if curChIdx was concurrently changed, since channel was closed by another thread
// when current index was changed and no other wrong channel will be closed by current thread because
// onChannelFailure checks channel binded to the holder before closing it.
onChannelFailure(channels.get(curChIdx), ch);
}
/**
* On channel of the specified holder failure.
*/
private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
if (ch != null && ch == hld.ch)
hld.closeChannel();
chFailLsnrs.forEach(Runnable::run);
// Roll current channel even if a topology changes. To help find working channel faster.
rollCurrentChannel(hld);
if (scheduledChannelsReinit.get())
channelsInit();
}
/**
* @param chFailLsnr Listener for the channel fail (disconnect).
*/
public void addChannelFailListener(Runnable chFailLsnr) {
chFailLsnrs.add(chFailLsnr);
}
/**
* Should the channel initialization be stopped.
*/
private boolean shouldStopChannelsReinit() {
return scheduledChannelsReinit.get() || closed;
}
/**
* Init channel holders to all nodes.
* @return boolean wheter channels was reinited.
*/
synchronized boolean initChannelHolders() {
List<ClientChannelHolder> holders = channels;
// Enable parallel threads to schedule new init of channel holders.
scheduledChannelsReinit.set(false);
Map<InetSocketAddress, Integer> newAddrs = null;
if (clientCfg.getAddressesFinder() != null) {
String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
if (hostAddrs.length == 0)
throw new IgniteClientException("Empty addresses");
if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
newAddrs = parsedAddresses(hostAddrs);
prevHostAddrs = hostAddrs;
}
}
else if (holders == null)
newAddrs = parsedAddresses(clientCfg.getAddresses());
if (newAddrs == null)
return true;
Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
if (holders != null) {
for (int i = 0; i < holders.size(); i++) {
ClientChannelHolder h = holders.get(i);
curAddrs.put(h.chCfg.getAddress(), h);
allAddrs.add(h.chCfg.getAddress());
}
}
List<ClientChannelHolder> reinitHolders = new ArrayList<>();
// The variable holds a new index of default channel after topology change.
// Suppose that reuse of the channel is better than open new connection.
int dfltChannelIdx = -1;
ClientChannelHolder currDfltHolder = null;
int idx = curChIdx;
if (idx != -1)
currDfltHolder = holders.get(idx);
for (InetSocketAddress addr : allAddrs) {
if (shouldStopChannelsReinit())
return false;
// Obsolete addr, to be removed.
if (!newAddrs.containsKey(addr)) {
curAddrs.get(addr).close();
continue;
}
// Create new holders for new addrs.
if (!curAddrs.containsKey(addr)) {
ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
for (int i = 0; i < newAddrs.get(addr); i++)
reinitHolders.add(hld);
continue;
}
// This holder is up to date.
ClientChannelHolder hld = curAddrs.get(addr);
for (int i = 0; i < newAddrs.get(addr); i++)
reinitHolders.add(hld);
if (hld == currDfltHolder)
dfltChannelIdx = reinitHolders.size() - 1;
}
if (dfltChannelIdx == -1)
dfltChannelIdx = new Random().nextInt(reinitHolders.size());
curChannelsGuard.writeLock().lock();
try {
channels = reinitHolders;
curChIdx = dfltChannelIdx;
}
finally {
curChannelsGuard.writeLock().unlock();
}
return true;
}
/**
* Establishing connections to servers. If partition awareness feature is enabled connections are created
* for every configured server. Otherwise only default channel is connected.
*/
void channelsInit() {
// Do not establish connections if interrupted.
if (!initChannelHolders())
return;
// Apply no-op function. Establish default channel connection.
applyOnDefaultChannel(channel -> null);
}
/** */
private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) {
return applyOnDefaultChannel(function, getRetryLimit(), DO_NOTHING);
}
/**
* Apply specified {@code function} on any of available channel.
*/
private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function,
int attemptsLimit,
Consumer<Integer> attemptsCallback) {
Throwable failure = null;
for (int attempt = 0; attempt < attemptsLimit; attempt++) {
ClientChannelHolder hld = null;
ClientChannel c = null;
try {
if (closed)
throw new IgniteClientException("Channel is closed");
curChannelsGuard.readLock().lock();
try {
hld = channels.get(curChIdx);
}
finally {
curChannelsGuard.readLock().unlock();
}
c = hld.getOrCreateChannel();
if (c != null) {
attemptsCallback.accept(attempt + 1);
return function.apply(c);
}
}
catch (Throwable e) {
if (failure == null)
failure = e;
else
failure.addSuppressed(e);
onChannelFailure(hld, c);
}
}
throw new IgniteClientConnectionException("Failed to connect", failure);
}
/** Get retry limit. */
private int getRetryLimit() {
List<ClientChannelHolder> holders = channels;
if (holders == null)
throw new IgniteClientException("Connections to nodes aren't initialized.");
int size = holders.size();
return clientCfg.getRetryLimit() > 0 ? Math.min(clientCfg.getRetryLimit(), size) : size;
}
/**
* Channels holder.
*/
@SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
class ClientChannelHolder {
/** Channel configuration. */
private final ClientChannelConfiguration chCfg;
/** Channel. */
private volatile ClientChannel ch;
/** ID of the last server node that {@link ch} is or was connected to. */
private volatile UUID serverNodeId;
/** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
private volatile boolean close;
/** Timestamps of reconnect retries. */
private final long[] reconnectRetries;
/**
* @param chCfg Channel config.
*/
private ClientChannelHolder(ClientChannelConfiguration chCfg) {
this.chCfg = chCfg;
reconnectRetries = chCfg.getReconnectThrottlingRetries() > 0 && chCfg.getReconnectThrottlingPeriod() > 0L ?
new long[chCfg.getReconnectThrottlingRetries()] : null;
}
/**
* @return Whether reconnect throttling should be applied.
*/
private boolean applyReconnectionThrottling() {
if (reconnectRetries == null)
return false;
long ts = System.currentTimeMillis();
for (int i = 0; i < reconnectRetries.length; i++) {
if (ts - reconnectRetries[i] >= chCfg.getReconnectThrottlingPeriod()) {
reconnectRetries[i] = ts;
return false;
}
}
return true;
}
/**
* Get or create channel.
*/
private ClientChannel getOrCreateChannel()
throws IgniteClientConnectionException, IgniteClientAuthenticationException {
return getOrCreateChannel(false);
}
/**
* Get or create channel.
*/
private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
throws IgniteClientConnectionException, IgniteClientAuthenticationException {
if (ch == null && !close) {
synchronized (this) {
if (close)
return null;
if (ch != null)
return ch;
if (!ignoreThrottling && applyReconnectionThrottling())
throw new IgniteClientConnectionException("Reconnect is not allowed due to applied throttling");
ch = chFactory.apply(chCfg, connMgr);
}
}
return ch;
}
/**
* Close channel.
*/
private synchronized void closeChannel() {
if (ch != null) {
try {
ch.close();
}
catch (Exception ignored) {
}
ch = null;
}
}
/**
* Close holder.
*/
void close() {
close = true;
if (serverNodeId != null)
nodeChannels.remove(serverNodeId, this);
closeChannel();
}
}
}