blob: d9d4c0869d0bf0fd738fbe8d01f3fbf53358c8b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.netty;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.network.ChannelType.getChannel;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.recovery.DescriptorAcquiry;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Class that manages connections both incoming and outgoing.
*/
public class ConnectionManager implements ChannelCreationListener {
/** Message factory. */
private static final NetworkMessagesFactory FACTORY = new NetworkMessagesFactory();
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(ConnectionManager.class);
/** Latest version of the direct marshalling protocol. */
public static final byte DIRECT_PROTOCOL_VERSION = 1;
private static final int MAX_RETRIES_TO_OPEN_CHANNEL = 10;
/** Client bootstrap. */
private final Bootstrap clientBootstrap;
/** Server. */
private final NettyServer server;
/** Channels map from consistentId to {@link NettySender}. */
private final Map<ConnectorKey<String>, NettySender> channels = new ConcurrentHashMap<>();
/** Clients. */
private final Map<ConnectorKey<InetSocketAddress>, NettyClient> clients = new ConcurrentHashMap<>();
/** Serialization service. */
private final SerializationService serializationService;
/** Message listeners. */
private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
/** Node consistent id. */
private final String consistentId;
/**
* Completed when local node is set; attempts to initiate a connection to this node from the outside will wait
* till it's completed.
*/
private final CompletableFuture<ClusterNode> localNodeFuture = new CompletableFuture<>();
private final NettyBootstrapFactory bootstrapFactory;
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
private final ClusterIdSupplier clusterIdSupplier;
/** Factory producing {@link RecoveryClientHandshakeManager} instances. */
private final @Nullable RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory;
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopping = new AtomicBoolean(false);
/** Stop flag. */
private final AtomicBoolean stopped = new AtomicBoolean(false);
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();
/** Network Configuration. */
private final NetworkView networkConfiguration;
/** Thread pool used for connection management tasks (like disposing recovery descriptors on node left or on stop). */
private final ExecutorService connectionMaintenanceExecutor;
/** Failure processor. */
private final FailureManager failureManager;
/**
* Constructor.
*
* @param networkConfiguration Network configuration.
* @param serializationService Serialization service.
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
* @param staleIdDetector Detects stale member IDs.
* @param clusterIdSupplier Supplier of cluster ID.
* @param failureManager Used to fail the node if a critical failure happens.
*/
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
String consistentId,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
ClusterIdSupplier clusterIdSupplier,
FailureManager failureManager
) {
this(
networkConfiguration,
serializationService,
consistentId,
bootstrapFactory,
staleIdDetector,
clusterIdSupplier,
null,
failureManager
);
}
/**
* Constructor.
*
* @param networkConfiguration Network configuration.
* @param serializationService Serialization service.
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
* @param staleIdDetector Detects stale member IDs.
* @param clusterIdSupplier Supplier of cluster ID.
* @param clientHandshakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances.
* @param failureManager Used to fail the node if a critical failure happens.
*/
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
String consistentId,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
ClusterIdSupplier clusterIdSupplier,
@Nullable RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory,
FailureManager failureManager
) {
this.serializationService = serializationService;
this.consistentId = consistentId;
this.bootstrapFactory = bootstrapFactory;
this.staleIdDetector = staleIdDetector;
this.clusterIdSupplier = clusterIdSupplier;
this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.networkConfiguration = networkConfiguration;
this.failureManager = failureManager;
this.server = new NettyServer(
networkConfiguration,
this::createServerHandshakeManager,
this::onMessage,
serializationService,
bootstrapFactory
);
this.clientBootstrap = bootstrapFactory.createClientBootstrap();
// We don't just use Executors#newSingleThreadExecutor() here because the maintenance thread will
// be kept alive forever, and we only need it from time to time, so it seems a waste to keep the thread alive.
ThreadPoolExecutor maintenanceExecutor = new ThreadPoolExecutor(
1,
1,
1,
SECONDS,
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(consistentId, "connection-maintenance", LOG)
);
maintenanceExecutor.allowCoreThreadTimeOut(true);
connectionMaintenanceExecutor = maintenanceExecutor;
}
/**
* Starts the server.
*
* @throws IgniteInternalException If failed to start.
*/
public void start() throws IgniteInternalException {
try {
boolean wasStarted = started.getAndSet(true);
if (wasStarted) {
throw new IgniteInternalException("Attempted to start an already started connection manager");
}
if (stopped.get()) {
throw new IgniteInternalException("Attempted to start an already stopped connection manager");
}
server.start().get();
LOG.info("Server started [address={}]", server.address());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInternalException("Interrupted while starting the connection manager", e);
}
}
/**
* Returns server local address.
*
* @return Server local address.
*/
public InetSocketAddress localAddress() {
return (InetSocketAddress) server.address();
}
/**
* Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
*
* @param consistentId Another node's consistent id.
* @param address Another node's address.
* @return Sender.
*/
public OrderingFuture<NettySender> channel(@Nullable String consistentId, ChannelType type, InetSocketAddress address) {
return getChannelWithRetry(consistentId, type, address, 0);
}
private OrderingFuture<NettySender> getChannelWithRetry(
@Nullable String consistentId,
ChannelType type,
InetSocketAddress address,
int attempt
) {
if (attempt > MAX_RETRIES_TO_OPEN_CHANNEL) {
return OrderingFuture.failedFuture(new IllegalStateException("Too many attempts to open channel to " + consistentId));
}
return doGetChannel(consistentId, type, address)
.handle((res, ex) -> {
if (ex instanceof ChannelAlreadyExistsException) {
return getChannelWithRetry(((ChannelAlreadyExistsException) ex).consistentId(), type, address, attempt + 1);
}
if (ex != null && ex.getCause() instanceof ChannelAlreadyExistsException) {
return getChannelWithRetry(
((ChannelAlreadyExistsException) ex.getCause()).consistentId(),
type,
address,
attempt + 1
);
}
if (ex != null) {
return OrderingFuture.<NettySender>failedFuture(ex);
}
assert res != null;
if (res.isOpen()) {
return OrderingFuture.completedFuture(res);
} else {
return getChannelWithRetry(res.consistentId(), type, address, attempt + 1);
}
})
.thenCompose(identity());
}
private OrderingFuture<NettySender> doGetChannel(
@Nullable String consistentId,
ChannelType type,
InetSocketAddress address
) {
// Problem is we can't look up a channel by consistent id because consistent id is not known yet.
if (consistentId != null) {
// If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
// or an inbound connection associated with that consistent id.
NettySender channel = channels.compute(
new ConnectorKey<>(consistentId, type),
(key, sender) -> (sender == null || !sender.isOpen()) ? null : sender
);
if (channel != null) {
return OrderingFuture.completedFuture(channel);
}
}
// Get an existing client or create a new one. NettyClient provides a future that resolves
// when the client is ready for write operations, so previously started client, that didn't establish connection
// or didn't perform the handshake operation, can be reused.
@Nullable NettyClient client = clients.compute(
new ConnectorKey<>(address, type),
(key, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(key.id(), key.type())
);
if (client == null) {
return OrderingFuture.failedFuture(new NodeStoppingException("No outgoing connections are allowed as the node is stopping"));
}
return client.sender();
}
private static boolean isClientConnected(@Nullable NettyClient client) {
return client != null && !client.failedToConnect() && !client.isDisconnected();
}
/**
* Callback that is called upon receiving a new message.
*
* @param message New message.
*/
private void onMessage(InNetworkObject message) {
listeners.forEach(consumer -> consumer.accept(message));
}
/**
* Callback that is called upon new client connected to the server.
*
* @param channel Channel from client to this {@link #server}.
*/
@Override
public void handshakeFinished(NettySender channel) {
ConnectorKey<String> key = new ConnectorKey<>(channel.consistentId(), getChannel(channel.channelId()));
NettySender oldChannel = channels.put(key, channel);
// Old channel can still be in the map, but it must be closed already by the tie breaker in the
// handshake manager.
assert oldChannel == null || !oldChannel.isOpen() : "Incorrect channel creation flow";
// Preventing a race between calling handleNodeLeft() and putting a new channel that was just opened (with the node
// which is already stale). If it's stale, then the stale detector already knows it (and it knows it before
// handleNodeLeft() gets called as it subscribes first).
// This is the only place where a new sender might be added to the map.
if (staleIdDetector.isIdStale(channel.launchId())) {
closeSenderAndDisposeDescriptor(channel, new RecipientLeftException());
channels.remove(key, channel);
} else if (stopping.get()) {
// Same thing as above, but for stopping this node instead of handling a 'node left' for another node.
closeSenderAndDisposeDescriptor(channel, new NodeStoppingException());
channels.remove(key, channel);
}
}
private void closeSenderAndDisposeDescriptor(NettySender sender, Exception exceptionToFailSendFutures) {
connectionMaintenanceExecutor.submit(() -> {
sender.closeAsync().whenCompleteAsync((res, ex) -> {
RecoveryDescriptor recoveryDescriptor = descriptorProvider.getRecoveryDescriptor(
sender.consistentId(),
sender.launchId(),
sender.channelId()
);
blockAndDisposeDescriptor(recoveryDescriptor, exceptionToFailSendFutures);
}, connectionMaintenanceExecutor);
});
}
/**
* Create new client from this node to specified address.
*
* @param address Target address.
* @return New netty client or {@code null} if we are stopping.
*/
@Nullable
private NettyClient connect(InetSocketAddress address, ChannelType channelType) {
if (stopping.get()) {
return null;
}
var client = new NettyClient(
address,
serializationService,
createClientHandshakeManager(channelType.id()),
this::onMessage,
this.networkConfiguration.ssl()
);
client.start(clientBootstrap).whenComplete((sender, throwable) -> {
if (throwable != null) {
clients.remove(new ConnectorKey<>(address, channelType));
}
});
return client;
}
/**
* Add incoming message listener.
*
* @param listener Message listener.
*/
public void addListener(Consumer<InNetworkObject> listener) {
listeners.add(listener);
}
/**
* Stops the server and all clients.
*/
public void stop() {
boolean wasStopped = this.stopped.getAndSet(true);
if (wasStopped) {
return;
}
assert stopping.get();
//noinspection FuseStreamOperations
List<CompletableFuture<Void>> stopFutures = new ArrayList<>(clients.values().stream().map(NettyClient::stop).collect(toList()));
stopFutures.add(server.stop());
stopFutures.addAll(channels.values().stream().map(NettySender::closeAsync).collect(toList()));
stopFutures.add(disposeDescriptors());
CompletableFuture<Void> finalStopFuture = allOf(stopFutures.toArray(CompletableFuture<?>[]::new));
try {
finalStopFuture.get();
} catch (Exception e) {
LOG.warn("Failed to stop connection manager [reason={}]", e.getMessage());
}
IgniteUtils.shutdownAndAwaitTermination(connectionMaintenanceExecutor, 10, SECONDS);
}
private CompletableFuture<Void> disposeDescriptors() {
Exception exceptionToFailSendFutures = new NodeStoppingException();
Collection<RecoveryDescriptor> descriptors = descriptorProvider.getAllRecoveryDescriptors();
List<CompletableFuture<Void>> disposeFutures = new ArrayList<>(descriptors.size());
for (RecoveryDescriptor descriptor : descriptors) {
disposeFutures.add(blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures));
}
return allOf(disposeFutures.toArray(CompletableFuture[]::new));
}
/**
* Returns {@code true} if the connection manager is stopped or is being stopped, {@code false} otherwise.
*
* @return {@code true} if the connection manager is stopped or is being stopped, {@code false} otherwise.
*/
public boolean isStopped() {
return stopped.get();
}
private HandshakeManager createClientHandshakeManager(short connectionId) {
ClusterNode localNode = Objects.requireNonNull(localNodeFuture.getNow(null), "localNode not set");
if (clientHandshakeManagerFactory == null) {
return new RecoveryClientHandshakeManager(
localNode,
connectionId,
descriptorProvider,
bootstrapFactory,
staleIdDetector,
clusterIdSupplier,
this,
stopping::get,
failureManager
);
}
return clientHandshakeManagerFactory.create(
localNode,
connectionId,
descriptorProvider
);
}
private HandshakeManager createServerHandshakeManager() {
// Do not just use localNodeFuture.join() to make sure the wait is time-limited.
waitForLocalNodeToBeSet();
return new RecoveryServerHandshakeManager(
localNodeFuture.join(),
FACTORY,
descriptorProvider,
bootstrapFactory,
staleIdDetector,
clusterIdSupplier,
this,
stopping::get,
failureManager
);
}
private void waitForLocalNodeToBeSet() {
try {
localNodeFuture.get(10, SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for local node to be set", e);
} catch (ExecutionException | TimeoutException e) {
throw new RuntimeException("Could not finish awaiting for local node", e);
}
}
/**
* Returns connection manager's {@link #server}.
*
* @return Connection manager's {@link #server}.
*/
@TestOnly
public NettyServer server() {
return server;
}
@TestOnly
public SerializationService serializationService() {
return serializationService;
}
/**
* Returns this node's consistent id.
*
* @return This node's consistent id.
*/
public String consistentId() {
return consistentId;
}
/**
* Returns collection of all the clients started by this connection manager.
*
* @return Collection of all the clients started by this connection manager.
*/
@TestOnly
public Collection<NettyClient> clients() {
return Collections.unmodifiableCollection(clients.values());
}
/**
* Returns collection of all channels of this connection manager.
*
* @return Collection of all channels of this connection manager.
*/
@TestOnly
public Map<ConnectorKey<String>, NettySender> channels() {
return Map.copyOf(channels);
}
/**
* Marks this connection manager as being stopped. In this state, it does not make any new connections, does not accept any connections
* and does not consider handshake rejections as critical events.
*/
public void initiateStopping() {
stopping.set(true);
}
/**
* Closes physical connections with an Ignite node identified by the given ID (it's not consistentId,
* it's ID that gets regenerated at each node restart) and recovery descriptors corresponding to it.
*
* @param id ID of the node (it must have already left the topology).
*/
public void handleNodeLeft(UUID id) {
// We rely on the fact that the node with the given ID has already left the physical topology.
assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
// TODO: IGNITE-21207 - remove descriptors for good.
connectionMaintenanceExecutor.execute(
() -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> {
// Closing descriptors separately (as some of them might not have an operating channel attached, but they
// still might have unacknowledged messages/futures).
disposeRecoveryDescriptorsOfLeftNode(id);
}, connectionMaintenanceExecutor)
);
}
private CompletableFuture<Void> closeChannelsWith(UUID id) {
List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = channels.entrySet().stream()
.filter(entry -> entry.getValue().launchId().equals(id))
.collect(toList());
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) {
closeFutures.add(entry.getValue().closeAsync());
channels.remove(entry.getKey());
}
return allOf(closeFutures.toArray(CompletableFuture[]::new));
}
private void disposeRecoveryDescriptorsOfLeftNode(UUID id) {
Exception exceptionToFailSendFutures = new RecipientLeftException();
for (RecoveryDescriptor descriptor : descriptorProvider.getRecoveryDescriptorsByLaunchId(id)) {
blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures);
}
}
private CompletableFuture<Void> blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception exceptionToFailSendFutures) {
while (!descriptor.tryBlockForever(exceptionToFailSendFutures)) {
if (descriptor.isBlockedForever()) {
// Already blocked concurrently, nothing to do here, the one who blocked it will handle the disposal (or already did).
return nullCompletedFuture();
}
DescriptorAcquiry acquiry = descriptor.holder();
if (acquiry == null) {
// The descriptor was acquired when we tried to block it, but now it was released. Let's try blocking it again.
continue;
}
// Could not block, someone did not release the descriptor yet (but will do soon). Close their channel (just in case)
// and try again.
Channel channel = acquiry.channel();
assert channel != null;
return NettyUtils.toCompletableFuture(channel.close())
.handleAsync(
(res, e) -> blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures),
connectionMaintenanceExecutor
)
.thenCompose(identity());
}
// Now we hold the acquiry, so noone else can touch the unacknowledged queue and there is happens-before between all
// previous operations with this queue and our current actions.
descriptor.dispose(exceptionToFailSendFutures);
return nullCompletedFuture();
}
/**
* Sets the local node. Only after this this manager becomes able to accept incoming connections.
*/
public void setLocalNode(ClusterNode localNode) {
localNodeFuture.complete(localNode);
}
}