blob: e648d338894e8ac885b429574832b7ac2363905a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.client.ClientOperationType;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.client.RetryPolicy;
import org.apache.ignite.client.RetryPolicyContext;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplexer;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
* Communication channel with failover and partition awareness.
*/
public final class ReliableChannel implements AutoCloseable {
/** Channel factory. */
private final ClientChannelFactory chFactory;
/** Metrics. */
private final ClientMetricSource metrics;
/** Client channel holders for each configured address. */
private volatile List<ClientChannelHolder> channels;
/** Index of the default channel. */
private volatile int defaultChIdx = -1;
/** Index of the current channel (for round-robin balancing). */
private final AtomicInteger curChIdx = new AtomicInteger();
/** Client configuration. */
private final IgniteClientConfiguration clientCfg;
/** Node channels by name (consistent id). */
private final Map<String, ClientChannelHolder> nodeChannelsByName = new ConcurrentHashMap<>();
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
/** Channel is closed. */
private volatile boolean closed;
/** Fail (disconnect) listeners. */
private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
/** Guard channels and curChIdx together. */
private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();
/** Connection manager. */
private final ClientConnectionMultiplexer connMgr;
private final IgniteLogger log;
/** Cache addresses returned by {@code ThinClientAddressFinder}. */
private volatile String[] prevHostAddrs;
/** Latest known partition assignment timestamp (for any table). */
private final AtomicLong partitionAssignmentTimestamp = new AtomicLong();
/** Observable timestamp, or causality token. Sent by the server with every response, and required by some requests. */
private final AtomicLong observableTimestamp = new AtomicLong();
/** Cluster id from the first handshake. */
private final AtomicReference<UUID> clusterId = new AtomicReference<>();
/** Scheduled executor for streamer flush. */
@Nullable
private ScheduledExecutorService streamerFlushExecutor;
/**
* Constructor.
*
* @param chFactory Channel factory.
* @param clientCfg Client config.
*/
ReliableChannel(
ClientChannelFactory chFactory,
IgniteClientConfiguration clientCfg,
ClientMetricSource metrics) {
this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
this.log = ClientUtils.logger(clientCfg, ReliableChannel.class);
this.metrics = metrics;
connMgr = new NettyClientConnectionMultiplexer(metrics);
connMgr.start(clientCfg);
}
/** {@inheritDoc} */
@Override
public synchronized void close() throws Exception {
closed = true;
List<ClientChannelHolder> holders = channels;
IgniteUtils.closeAllManually(
() -> {
if (holders != null) {
for (ClientChannelHolder hld : holders) {
hld.close();
}
}
},
connMgr::stop,
() -> shutdownAndAwaitTermination(streamerFlushExecutor, 10, TimeUnit.SECONDS));
}
/**
* Gets the metrics.
*
* @return Metrics.
*/
public ClientMetricSource metrics() {
return metrics;
}
/**
* Gets active client connections.
*
* @return List of connected cluster nodes.
*/
public List<ClusterNode> connections() {
List<ClusterNode> res = new ArrayList<>(channels.size());
for (var holder : nodeChannelsByName.values()) {
var chFut = holder.chFut;
if (chFut != null) {
var ch = ClientFutureUtils.getNowSafe(chFut);
if (ch != null && !ch.closed()) {
res.add(ch.protocolContext().clusterNode());
}
}
}
return res;
}
public IgniteClientConfiguration configuration() {
return clientCfg;
}
public long observableTimestamp() {
return observableTimestamp.get();
}
/**
* Sends request and handles response asynchronously.
*
* @param opCode Operation code.
* @param payloadWriter Payload writer.
* @param payloadReader Payload reader.
* @param <T> response type.
* @param preferredNodeName Unique name (consistent id) of the preferred target node. When a connection to the specified node exists,
* it will be used to handle the request; otherwise, default connection will be used.
* @param retryPolicyOverride Retry policy override.
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(
int opCode,
@Nullable PayloadWriter payloadWriter,
@Nullable PayloadReader<T> payloadReader,
@Nullable String preferredNodeName,
@Nullable RetryPolicy retryPolicyOverride,
boolean expectNotifications
) {
return ClientFutureUtils.doWithRetryAsync(
() -> getChannelAsync(preferredNodeName)
.thenCompose(ch -> serviceAsyncInternal(opCode, payloadWriter, payloadReader, expectNotifications, ch)),
null,
ctx -> shouldRetry(opCode, ctx, retryPolicyOverride));
}
/**
* Sends request and handles response asynchronously.
*
* @param opCode Operation code.
* @param payloadWriter Payload writer.
* @param payloadReader Payload reader.
* @param <T> response type.
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(
int opCode,
PayloadWriter payloadWriter,
@Nullable PayloadReader<T> payloadReader
) {
return serviceAsync(opCode, payloadWriter, payloadReader, null, null, false);
}
/**
* Sends request without payload and handles response asynchronously.
*
* @param opCode Operation code.
* @param payloadReader Payload reader.
* @param <T> Response type.
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T> payloadReader) {
return serviceAsync(opCode, null, payloadReader, null, null, false);
}
private <T> CompletableFuture<T> serviceAsyncInternal(
int opCode,
@Nullable PayloadWriter payloadWriter,
@Nullable PayloadReader<T> payloadReader,
boolean expectNotifications,
ClientChannel ch) {
return ch.serviceAsync(opCode, payloadWriter, payloadReader, expectNotifications).whenComplete((res, err) -> {
if (err != null && unwrapConnectionException(err) != null) {
onChannelFailure(ch);
}
});
}
private CompletableFuture<ClientChannel> getChannelAsync(@Nullable String preferredNodeName) {
// 1. Preferred node connection.
if (preferredNodeName != null) {
ClientChannelHolder holder = nodeChannelsByName.get(preferredNodeName);
if (holder != null) {
return holder.getOrCreateChannelAsync().thenCompose(ch -> {
if (ch != null) {
return completedFuture(ch);
} else {
return getDefaultChannelAsync();
}
});
}
}
// 2. Round-robin connection.
ClientChannel nextCh = getNextChannelWithoutReconnect();
if (nextCh != null) {
return completedFuture(nextCh);
}
// 3. Default connection (with reconnect if necessary).
return getDefaultChannelAsync();
}
/**
* Returns host:port_range address lines parsed as {@link InetSocketAddress} as a key. Value is the amount of appearences of an address
* in {@code addrs} parameter.
*
* @return host:port_range address lines parsed as {@link InetSocketAddress} as a key. Value is the amount of appearences of an address
* in {@code addrs} parameter.
*/
private static Map<InetSocketAddress, Integer> parsedAddresses(String[] addrs) {
if (addrs == null || addrs.length == 0) {
throw new IgniteException(CONFIGURATION_ERR, "Empty addresses");
}
Collection<HostAndPort> ranges = new ArrayList<>(addrs.length);
for (String a : addrs) {
ranges.add(HostAndPort.parse(a, IgniteClientConfiguration.DFLT_PORT, "Failed to parse Ignite server address"));
}
return ranges.stream()
.map(p -> InetSocketAddress.createUnresolved(p.host(), p.port()))
.collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
}
/**
* Roll current default channel if specified holder equals to it.
*/
private void rollCurrentChannel(ClientChannelHolder hld) {
curChannelsGuard.writeLock().lock();
try {
int idx = defaultChIdx;
List<ClientChannelHolder> holders = channels;
ClientChannelHolder dfltHld = holders.get(idx);
if (dfltHld == hld) {
idx += 1;
if (idx >= holders.size()) {
defaultChIdx = 0;
} else {
defaultChIdx = idx;
}
}
} finally {
curChannelsGuard.writeLock().unlock();
}
}
/**
* On current channel failure.
*/
private void onChannelFailure(ClientChannel ch) {
// There is nothing wrong if defaultChIdx was concurrently changed, since channel was closed by another thread
// when current index was changed and no other wrong channel will be closed by current thread because
// onChannelFailure checks channel binded to the holder before closing it.
onChannelFailure(channels.get(defaultChIdx), ch);
}
/**
* On channel of the specified holder failure.
*/
private void onChannelFailure(ClientChannelHolder hld, @Nullable ClientChannel ch) {
chFailLsnrs.forEach(Runnable::run);
// Roll current channel even if a topology changes. To help find working channel faster.
rollCurrentChannel(hld);
if (scheduledChannelsReinit.get()) {
channelsInitAsync();
}
}
/**
* Adds listener for the channel fail (disconnect).
*
* @param chFailLsnr Listener for the channel fail (disconnect).
*/
public void addChannelFailListener(Runnable chFailLsnr) {
chFailLsnrs.add(chFailLsnr);
}
/**
* Should the channel initialization be stopped.
*/
private boolean shouldStopChannelsReinit() {
return scheduledChannelsReinit.get() || closed;
}
/**
* Init channel holders to all nodes.
*
* @return boolean wheter channels was reinited.
*/
private synchronized boolean initChannelHolders() {
List<ClientChannelHolder> holders = channels;
// Enable parallel threads to schedule new init of channel holders.
scheduledChannelsReinit.set(false);
Map<InetSocketAddress, Integer> newAddrs = null;
if (clientCfg.addressesFinder() != null) {
String[] hostAddrs = clientCfg.addressesFinder().getAddresses();
if (hostAddrs.length == 0) {
throw new IgniteException(CONFIGURATION_ERR, "Empty addresses");
}
if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
newAddrs = parsedAddresses(hostAddrs);
prevHostAddrs = hostAddrs;
}
} else if (holders == null) {
newAddrs = parsedAddresses(clientCfg.addresses());
}
if (newAddrs == null) {
return true;
}
Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
if (holders != null) {
for (int i = 0; i < holders.size(); i++) {
ClientChannelHolder h = holders.get(i);
curAddrs.put(h.chCfg.getAddress(), h);
allAddrs.add(h.chCfg.getAddress());
}
}
List<ClientChannelHolder> reinitHolders = new ArrayList<>();
// The variable holds a new index of default channel after topology change.
// Suppose that reuse of the channel is better than open new connection.
int dfltChannelIdx = -1;
ClientChannelHolder currDfltHolder = null;
int idx = defaultChIdx;
if (idx != -1) {
currDfltHolder = holders.get(idx);
}
for (InetSocketAddress addr : allAddrs) {
if (shouldStopChannelsReinit()) {
return false;
}
// Obsolete addr, to be removed.
if (!newAddrs.containsKey(addr)) {
curAddrs.get(addr).close();
continue;
}
// Create new holders for new addrs.
if (!curAddrs.containsKey(addr)) {
ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
for (int i = 0; i < newAddrs.get(addr); i++) {
reinitHolders.add(hld);
}
continue;
}
// This holder is up to date.
ClientChannelHolder hld = curAddrs.get(addr);
for (int i = 0; i < newAddrs.get(addr); i++) {
reinitHolders.add(hld);
}
if (hld == currDfltHolder) {
dfltChannelIdx = reinitHolders.size() - 1;
}
}
if (dfltChannelIdx == -1) {
dfltChannelIdx = 0;
}
curChannelsGuard.writeLock().lock();
try {
channels = reinitHolders;
defaultChIdx = dfltChannelIdx;
} finally {
curChannelsGuard.writeLock().unlock();
}
return true;
}
/**
* Init channel holders, establish connection to default channel.
*/
CompletableFuture<ClientChannel> channelsInitAsync() {
// Do not establish connections if interrupted.
if (!initChannelHolders()) {
return nullCompletedFuture();
}
// Establish default channel connection.
var fut = getDefaultChannelAsync();
// Establish secondary connections in the background.
fut.thenAccept(unused -> ForkJoinPool.commonPool().submit(this::initAllChannelsAsync));
return fut;
}
private @Nullable ClientChannel getNextChannelWithoutReconnect() {
curChannelsGuard.readLock().lock();
try {
int startIdx = curChIdx.incrementAndGet();
for (int i = 0; i < channels.size(); i++) {
int nextIdx = Math.abs(startIdx + i) % channels.size();
ClientChannelHolder hld = channels.get(nextIdx);
ClientChannel ch = hld == null ? null : hld.getNow();
if (ch != null) {
return ch;
}
}
} finally {
curChannelsGuard.readLock().unlock();
}
return null;
}
/**
* Gets the default channel, reconnecting if necessary.
*/
private CompletableFuture<ClientChannel> getDefaultChannelAsync() {
return ClientFutureUtils.doWithRetryAsync(
() -> {
curChannelsGuard.readLock().lock();
ClientChannelHolder hld;
try {
hld = channels.get(defaultChIdx);
} finally {
curChannelsGuard.readLock().unlock();
}
return hld.getOrCreateChannelAsync();
},
Objects::nonNull,
ctx -> shouldRetry(ClientOperationType.CHANNEL_CONNECT, ctx, null));
}
private CompletableFuture<ClientChannel> getCurChannelAsync() {
if (closed) {
return CompletableFuture.failedFuture(new IgniteClientConnectionException(CONNECTION_ERR, "ReliableChannel is closed"));
}
curChannelsGuard.readLock().lock();
try {
var hld = channels.get(defaultChIdx);
if (hld == null) {
return nullCompletedFuture();
}
CompletableFuture<ClientChannel> fut = hld.getOrCreateChannelAsync();
return fut == null ? nullCompletedFuture() : fut;
} finally {
curChannelsGuard.readLock().unlock();
}
}
/** Determines whether specified operation should be retried. */
private boolean shouldRetry(int opCode, ClientFutureUtils.RetryContext ctx, @Nullable RetryPolicy retryPolicyOverride) {
ClientOperationType opType = ClientUtils.opCodeToClientOperationType(opCode);
boolean res = shouldRetry(opType, ctx, retryPolicyOverride);
if (log.isDebugEnabled()) {
if (res) {
log.debug("Retrying operation [opCode=" + opCode + ", opType=" + opType + ", attempt=" + ctx.attempt
+ ", lastError=" + ctx.lastError() + ']');
} else {
log.debug("Not retrying operation [opCode=" + opCode + ", opType=" + opType + ", attempt=" + ctx.attempt
+ ", lastError=" + ctx.lastError() + ']');
}
}
if (res) {
metrics.requestsRetriedIncrement();
}
return res;
}
/** Determines whether specified operation should be retried. */
private boolean shouldRetry(
@Nullable ClientOperationType opType,
ClientFutureUtils.RetryContext ctx,
@Nullable RetryPolicy retryPolicyOverride) {
var err = ctx.lastError();
if (err == null) {
// Closed channel situation - no error, but connection should be retried.
return opType == ClientOperationType.CHANNEL_CONNECT && ctx.attempt < RetryLimitPolicy.DFLT_RETRY_LIMIT;
}
IgniteClientConnectionException exception = unwrapConnectionException(err);
if (exception == null) {
return false;
}
if (exception.code() == CLUSTER_ID_MISMATCH_ERR) {
return false;
}
if (opType == null) {
// System operation.
return ctx.attempt < RetryLimitPolicy.DFLT_RETRY_LIMIT;
}
RetryPolicy plc = retryPolicyOverride != null ? retryPolicyOverride : clientCfg.retryPolicy();
if (plc == null) {
return false;
}
RetryPolicyContext retryPolicyContext = new RetryPolicyContextImpl(clientCfg, opType, ctx.attempt, exception);
// Exception in shouldRetry will be handled by ClientFutureUtils.doWithRetryAsync
return plc.shouldRetry(retryPolicyContext);
}
/**
* Establish or repair connections to all configured servers.
*/
private void initAllChannelsAsync() {
List<ClientChannelHolder> holders = channels;
List<CompletableFuture<ClientChannel>> futs = new ArrayList<>(holders.size());
for (ClientChannelHolder hld : holders) {
if (closed) {
return;
}
try {
futs.add(hld.getOrCreateChannelAsync(true));
} catch (Exception e) {
log.warn("Failed to establish connection to " + hld.chCfg.getAddress() + ": " + e.getMessage(), e);
}
}
long interval = clientCfg.reconnectInterval();
if (interval > 0 && !closed) {
// After current round of connection attempts is finished, schedule the next one with a configured delay.
CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new))
.whenCompleteAsync(
(res, err) -> initAllChannelsAsync(),
CompletableFuture.delayedExecutor(interval, TimeUnit.MILLISECONDS));
}
}
private void onObservableTimestampReceived(long newTs) {
observableTimestamp.updateAndGet(curTs -> Math.max(curTs, newTs));
}
private void onPartitionAssignmentChanged(long timestamp) {
partitionAssignmentTimestamp.updateAndGet(curTs -> Math.max(curTs, timestamp));
}
/**
* Gets the last known primary replica start time (for any table).
*
* @return Primary replica max start time.
*/
public long partitionAssignmentTimestamp() {
return partitionAssignmentTimestamp.get();
}
/**
* Gets the data streamer flush scheduled executor.
*
* @return Streamer flush executor.
*/
public synchronized ScheduledExecutorService streamerFlushExecutor() {
if (streamerFlushExecutor == null) {
streamerFlushExecutor = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory(
"client-data-streamer-flush-" + hashCode(),
ClientUtils.logger(clientCfg, ReliableChannel.class)));
}
return streamerFlushExecutor;
}
@Nullable
private static IgniteClientConnectionException unwrapConnectionException(Throwable err) {
while (err instanceof CompletionException) {
err = err.getCause();
}
if (!(err instanceof IgniteClientConnectionException)) {
return null;
}
return (IgniteClientConnectionException) err;
}
/**
* Channels holder.
*/
@SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
class ClientChannelHolder {
/** Channel configuration. */
private final ClientChannelConfiguration chCfg;
/** Channel. */
private volatile @Nullable CompletableFuture<ClientChannel> chFut;
/** The last server node that channel is or was connected to. */
private volatile ClusterNode serverNode;
/** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
private volatile boolean close;
/** Timestamps of reconnect retries. */
private final long[] reconnectRetries;
/**
* Constructor.
*
* @param chCfg Channel config.
*/
private ClientChannelHolder(ClientChannelConfiguration chCfg) {
this.chCfg = chCfg;
reconnectRetries = chCfg.clientConfiguration().reconnectThrottlingRetries() > 0
&& chCfg.clientConfiguration().reconnectThrottlingPeriod() > 0L
? new long[chCfg.clientConfiguration().reconnectThrottlingRetries()]
: null;
}
/**
* Returns whether reconnect throttling should be applied.
*
* @return Whether reconnect throttling should be applied.
*/
private boolean applyReconnectionThrottling() {
if (reconnectRetries == null) {
return false;
}
long ts = System.currentTimeMillis();
for (int i = 0; i < reconnectRetries.length; i++) {
if (ts - reconnectRetries[i] >= chCfg.clientConfiguration().reconnectThrottlingPeriod()) {
reconnectRetries[i] = ts;
return false;
}
}
return true;
}
/**
* Get or create channel.
*/
private CompletableFuture<ClientChannel> getOrCreateChannelAsync() {
return getOrCreateChannelAsync(false);
}
/**
* Get or create channel.
*/
private CompletableFuture<ClientChannel> getOrCreateChannelAsync(boolean ignoreThrottling) {
if (close) {
return nullCompletedFuture();
}
var chFut0 = chFut;
if (isFutureInProgressOrDoneAndChannelOpen(chFut0)) {
return chFut0;
}
synchronized (this) {
if (close) {
return nullCompletedFuture();
}
chFut0 = chFut;
if (isFutureInProgressOrDoneAndChannelOpen(chFut0)) {
return chFut0;
}
if (!ignoreThrottling && applyReconnectionThrottling()) {
return CompletableFuture.failedFuture(
new IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due to applied throttling"));
}
CompletableFuture<ClientChannel> createFut = chFactory.create(
chCfg,
connMgr,
metrics,
ReliableChannel.this::onPartitionAssignmentChanged,
ReliableChannel.this::onObservableTimestampReceived);
chFut0 = createFut.thenApply(ch -> {
var oldClusterId = clusterId.compareAndExchange(null, ch.protocolContext().clusterId());
if (oldClusterId != null && !oldClusterId.equals(ch.protocolContext().clusterId())) {
try {
ch.close();
} catch (Exception ignored) {
// Ignore
}
throw new IgniteClientConnectionException(
CLUSTER_ID_MISMATCH_ERR,
"Cluster ID mismatch: expected=" + oldClusterId + ", actual=" + ch.protocolContext().clusterId());
}
ClusterNode newNode = ch.protocolContext().clusterNode();
// There could be multiple holders map to the same serverNodeId if user provide the same
// address multiple times in configuration.
nodeChannelsByName.put(newNode.name(), this);
var oldServerNode = serverNode;
if (oldServerNode != null && !oldServerNode.id().equals(newNode.id())) {
// New node on the old address.
nodeChannelsByName.remove(oldServerNode.name(), this);
}
serverNode = newNode;
return ch;
});
chFut0.exceptionally(err -> {
closeChannel();
onChannelFailure(this, null);
log.warn("Failed to establish connection to " + chCfg.getAddress() + ": " + err.getMessage(), err);
return null;
});
chFut = chFut0;
return chFut0;
}
}
/**
* Get channel if connected, or null otherwise.
*/
private @Nullable ClientChannel getNow() {
if (close) {
return null;
}
var f = chFut;
if (f == null) {
return null;
}
var ch = ClientFutureUtils.getNowSafe(f);
if (ch == null || ch.closed()) {
return null;
}
return ch;
}
/**
* Close channel.
*/
private synchronized void closeChannel() {
CompletableFuture<ClientChannel> ch0 = chFut;
if (ch0 != null) {
ch0.thenAccept(c -> {
try {
c.close();
} catch (Exception ignored) {
// No-op.
}
});
var oldServerNode = serverNode;
if (oldServerNode != null) {
nodeChannelsByName.remove(oldServerNode.name(), this);
}
chFut = null;
}
}
/**
* Close holder.
*/
void close() {
close = true;
var oldServerNode = serverNode;
if (oldServerNode != null) {
nodeChannelsByName.remove(oldServerNode.name(), this);
}
closeChannel();
}
private boolean isFutureInProgressOrDoneAndChannelOpen(@Nullable CompletableFuture<ClientChannel> f) {
if (f == null || f.isCompletedExceptionally()) {
return false;
}
if (!f.isDone()) {
return true;
}
var ch = ClientFutureUtils.getNowSafe(f);
return ch != null && !ch.closed();
}
}
}