blob: 2a77bfe72017176575872279c10be396d87a370f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.netty;
import static org.apache.ignite.network.ChannelType.getChannel;
import io.netty.bootstrap.Bootstrap;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Class that manages connections both incoming and outgoing.
*/
public class ConnectionManager implements ChannelCreationListener {
/** Message factory. */
private static final NetworkMessagesFactory FACTORY = new NetworkMessagesFactory();
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(ConnectionManager.class);
/** Latest version of the direct marshalling protocol. */
public static final byte DIRECT_PROTOCOL_VERSION = 1;
/** Client bootstrap. */
private final Bootstrap clientBootstrap;
/** Server. */
private final NettyServer server;
/** Channels map from consistentId to {@link NettySender}. */
private final Map<ConnectorKey<String>, NettySender> channels = new ConcurrentHashMap<>();
/** Clients. */
private final Map<ConnectorKey<InetSocketAddress>, NettyClient> clients = new ConcurrentHashMap<>();
/** Serialization service. */
private final SerializationService serializationService;
/** Message listeners. */
private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
/** Node consistent id. */
private final String consistentId;
/** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
private final UUID launchId;
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
/** Factory producing {@link RecoveryClientHandshakeManager} instances. */
private final @Nullable RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory;
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
/** Stop flag. */
private final AtomicBoolean stopped = new AtomicBoolean(false);
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();
/** Network Configuration. */
private final NetworkView networkConfiguration;
/**
* Constructor.
*
* @param networkConfiguration Network configuration.
* @param serializationService Serialization service.
* @param launchId Launch id of this node.
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
* @param staleIdDetector Detects stale member IDs.
*/
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
UUID launchId,
String consistentId,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector
) {
this(
networkConfiguration,
serializationService,
launchId,
consistentId,
bootstrapFactory,
staleIdDetector,
null
);
}
/**
* Constructor.
*
* @param networkConfiguration Network configuration.
* @param serializationService Serialization service.
* @param launchId Launch id of this node.
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
* @param staleIdDetector Detects stale member IDs.
* @param clientHandshakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances.
*/
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
UUID launchId,
String consistentId,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
@Nullable RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory
) {
this.serializationService = serializationService;
this.launchId = launchId;
this.consistentId = consistentId;
this.staleIdDetector = staleIdDetector;
this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.networkConfiguration = networkConfiguration;
this.server = new NettyServer(
networkConfiguration,
this::createServerHandshakeManager,
this::onMessage,
serializationService,
bootstrapFactory
);
this.clientBootstrap = bootstrapFactory.createClientBootstrap();
}
/**
* Starts the server.
*
* @throws IgniteInternalException If failed to start.
*/
public void start() throws IgniteInternalException {
try {
boolean wasStarted = started.getAndSet(true);
if (wasStarted) {
throw new IgniteInternalException("Attempted to start an already started connection manager");
}
if (stopped.get()) {
throw new IgniteInternalException("Attempted to start an already stopped connection manager");
}
server.start().get();
LOG.info("Server started [address={}]", server.address());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInternalException("Interrupted while starting the connection manager", e);
}
}
/**
* Returns server local address.
*
* @return Server local address.
*/
public InetSocketAddress localAddress() {
return (InetSocketAddress) server.address();
}
/**
* Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
*
* @param consistentId Another node's consistent id.
* @param address Another node's address.
* @return Sender.
*/
public OrderingFuture<NettySender> channel(@Nullable String consistentId, ChannelType type, InetSocketAddress address) {
// Problem is we can't look up a channel by consistent id because consistent id is not known yet.
if (consistentId != null) {
// If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
// or an inbound connection associated with that consistent id.
NettySender channel = channels.compute(
new ConnectorKey<>(consistentId, type),
(key, sender) -> (sender == null || !sender.isOpen()) ? null : sender
);
if (channel != null) {
return OrderingFuture.completedFuture(channel);
}
}
// Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
// when the client is ready for write operations, so previously started client, that didn't establish connection
// or didn't perform the handshake operation, can be reused.
NettyClient client = clients.compute(
new ConnectorKey<>(address, type),
(key, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(key.id(), key.type())
);
return client.sender();
}
private static boolean isClientConnected(@Nullable NettyClient client) {
return client != null && !client.failedToConnect() && !client.isDisconnected();
}
/**
* Callback that is called upon receiving a new message.
*
* @param message New message.
*/
private void onMessage(InNetworkObject message) {
listeners.forEach(consumer -> consumer.accept(message));
}
/**
* Callback that is called upon new client connected to the server.
*
* @param channel Channel from client to this {@link #server}.
*/
@Override
public void handshakeFinished(NettySender channel) {
ConnectorKey<String> key = new ConnectorKey<>(channel.consistentId(), getChannel(channel.channelId()));
NettySender oldChannel = channels.put(key, channel);
// Old channel can still be in the map, but it must be closed already by the tie breaker in the
// handshake manager.
assert oldChannel == null || !oldChannel.isOpen() : "Incorrect channel creation flow";
}
/**
* Create new client from this node to specified address.
*
* @param address Target address.
* @return New netty client.
*/
private NettyClient connect(InetSocketAddress address, ChannelType channelType) {
var client = new NettyClient(
address,
serializationService,
createClientHandshakeManager(channelType.id()),
this::onMessage,
this.networkConfiguration.ssl()
);
client.start(clientBootstrap).whenComplete((sender, throwable) -> {
if (throwable != null) {
clients.remove(new ConnectorKey<>(address, channelType));
}
});
return client;
}
/**
* Add incoming message listener.
*
* @param listener Message listener.
*/
public void addListener(Consumer<InNetworkObject> listener) {
listeners.add(listener);
}
/**
* Stops the server and all clients.
*/
public void stop() {
boolean wasStopped = this.stopped.getAndSet(true);
if (wasStopped) {
return;
}
Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat(
clients.values().stream().map(NettyClient::stop),
Stream.of(server.stop())
),
channels.values().stream().map(NettySender::closeAsync)
);
CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
try {
stopFut.join();
} catch (Exception e) {
LOG.warn("Failed to stop connection manager [reason={}]", e.getMessage());
}
}
/**
* Returns {@code true} if the connection manager is stopped or is being stopped, {@code false} otherwise.
*
* @return {@code true} if the connection manager is stopped or is being stopped, {@code false} otherwise.
*/
public boolean isStopped() {
return stopped.get();
}
private HandshakeManager createClientHandshakeManager(short connectionId) {
if (clientHandshakeManagerFactory == null) {
return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, descriptorProvider, staleIdDetector, this);
}
return clientHandshakeManagerFactory.create(
launchId,
consistentId,
connectionId,
descriptorProvider
);
}
private HandshakeManager createServerHandshakeManager() {
return new RecoveryServerHandshakeManager(launchId, consistentId, FACTORY, descriptorProvider, staleIdDetector, this);
}
/**
* Returns connection manager's {@link #server}.
*
* @return Connection manager's {@link #server}.
*/
@TestOnly
public NettyServer server() {
return server;
}
@TestOnly
public SerializationService serializationService() {
return serializationService;
}
/**
* Returns this node's consistent id.
*
* @return This node's consistent id.
*/
public String consistentId() {
return consistentId;
}
/**
* Returns collection of all the clients started by this connection manager.
*
* @return Collection of all the clients started by this connection manager.
*/
@TestOnly
public Collection<NettyClient> clients() {
return Collections.unmodifiableCollection(clients.values());
}
/**
* Returns collection of all channels of this connection manager.
*
* @return Collection of all channels of this connection manager.
*/
@TestOnly
public Map<ConnectorKey<String>, NettySender> channels() {
return Map.copyOf(channels);
}
}