| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.network.internal.netty; |
| |
| import java.net.SocketAddress; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.BiConsumer; |
| import java.util.function.Supplier; |
| import java.util.stream.Stream; |
| import io.netty.bootstrap.Bootstrap; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.nio.NioEventLoopGroup; |
| import io.netty.channel.socket.nio.NioSocketChannel; |
| import org.apache.ignite.lang.IgniteInternalException; |
| import org.apache.ignite.lang.IgniteLogger; |
| import org.apache.ignite.network.NetworkMessage; |
| import org.apache.ignite.network.internal.handshake.HandshakeManager; |
| import org.apache.ignite.network.serialization.MessageSerializationRegistry; |
| import org.jetbrains.annotations.Nullable; |
| import org.jetbrains.annotations.TestOnly; |
| |
| /** |
| * Class that manages connections both incoming and outgoing. |
| */ |
| public class ConnectionManager { |
| /** Logger. */ |
| private static final IgniteLogger LOG = IgniteLogger.forClass(ConnectionManager.class); |
| |
| /** Latest version of the direct marshalling protocol. */ |
| public static final byte DIRECT_PROTOCOL_VERSION = 1; |
| |
| /** Client bootstrap. */ |
| private final Bootstrap clientBootstrap; |
| |
| /** Client socket channel handler event loop group. */ |
| private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); |
| |
| /** Server. */ |
| private final NettyServer server; |
| |
| /** Channels map from consistentId to {@link NettySender}. */ |
| private final Map<String, NettySender> channels = new ConcurrentHashMap<>(); |
| |
| /** Clients. */ |
| private final Map<SocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); |
| |
| /** Serialization registry. */ |
| private final MessageSerializationRegistry serializationRegistry; |
| |
| /** Message listeners. */ |
| private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(); |
| |
| /** Node consistent id. */ |
| private final String consistentId; |
| |
| /** Client handshake manager factory. */ |
| private final Supplier<HandshakeManager> clientHandshakeManagerFactory; |
| |
| /** |
| * Constructor. |
| * |
| * @param port Server port. |
| * @param registry Serialization registry. |
| * @param consistentId Consistent id of this node. |
| * @param serverHandshakeManagerFactory Server handshake manager factory. |
| * @param clientHandshakeManagerFactory Client handshake manager factory. |
| */ |
| public ConnectionManager( |
| int port, |
| MessageSerializationRegistry registry, |
| String consistentId, |
| Supplier<HandshakeManager> serverHandshakeManagerFactory, |
| Supplier<HandshakeManager> clientHandshakeManagerFactory |
| ) { |
| this.serializationRegistry = registry; |
| this.consistentId = consistentId; |
| this.clientHandshakeManagerFactory = clientHandshakeManagerFactory; |
| this.server = new NettyServer( |
| port, |
| serverHandshakeManagerFactory.get(), |
| this::onNewIncomingChannel, |
| this::onMessage, |
| serializationRegistry |
| ); |
| this.clientBootstrap = createClientBootstrap(clientWorkerGroup, serializationRegistry); |
| } |
| |
| /** |
| * Starts the server. |
| * |
| * @throws IgniteInternalException If failed to start. |
| */ |
| public void start() throws IgniteInternalException { |
| try { |
| server.start().join(); |
| } |
| catch (CompletionException e) { |
| Throwable cause = e.getCause(); |
| throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause); |
| } |
| } |
| |
| /** |
| * @return Server local address. |
| */ |
| public SocketAddress getLocalAddress() { |
| return server.address(); |
| } |
| |
| /** |
| * Gets a {@link NettySender}, that sends data from this node to another node with the specified address. |
| * @param consistentId Another node's consistent id. |
| * @param address Another node's address. |
| * @return Sender. |
| */ |
| public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) { |
| if (consistentId != null) { |
| // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection |
| // or an inbound connection associated with that consistent id. |
| NettySender channel = channels.compute( |
| consistentId, |
| (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender |
| ); |
| |
| if (channel != null) |
| return CompletableFuture.completedFuture(channel); |
| } |
| |
| // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves |
| // when the client is ready for write operations, so previously started client, that didn't establish connection |
| // or didn't perform the handhsake operaton, can be reused. |
| NettyClient client = clients.compute(address, (addr, existingClient) -> |
| existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected() ? |
| existingClient : connect(addr) |
| ); |
| |
| CompletableFuture<NettySender> sender = client.sender(); |
| |
| assert sender != null; |
| |
| return sender; |
| } |
| |
| /** |
| * Callback that is called upon receiving a new message. |
| * |
| * @param from Source of the message. |
| * @param message New message. |
| */ |
| private void onMessage(SocketAddress from, NetworkMessage message) { |
| listeners.forEach(consumer -> consumer.accept(from, message)); |
| } |
| |
| /** |
| * Callback that is called upon new client connected to the server. |
| * |
| * @param channel Channel from client to this {@link #server}. |
| */ |
| private void onNewIncomingChannel(NettySender channel) { |
| channels.put(channel.consistentId(), channel); |
| } |
| |
| /** |
| * Create new client from this node to specified address. |
| * |
| * @param address Target address. |
| * @return New netty client. |
| */ |
| private NettyClient connect( |
| SocketAddress address |
| ) { |
| var client = new NettyClient( |
| address, |
| serializationRegistry, |
| clientHandshakeManagerFactory.get(), |
| this::onMessage |
| ); |
| |
| client.start(clientBootstrap).whenComplete((sender, throwable) -> { |
| if (throwable == null) |
| channels.put(sender.consistentId(), sender); |
| else |
| clients.remove(address); |
| }); |
| |
| return client; |
| } |
| |
| /** |
| * Add incoming message listener. |
| * |
| * @param listener Message listener. |
| */ |
| public void addListener(BiConsumer<SocketAddress, NetworkMessage> listener) { |
| listeners.add(listener); |
| } |
| |
| /** |
| * Stops the server and all clients. |
| */ |
| public void stop() { |
| Stream<CompletableFuture<Void>> stream = Stream.concat( |
| clients.values().stream().map(NettyClient::stop), |
| Stream.of(server.stop()) |
| ); |
| |
| CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new)); |
| |
| try { |
| stopFut.join(); |
| // TODO: IGNITE-14538 quietPeriod and timeout should be configurable. |
| clientWorkerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync(); |
| } |
| catch (Exception e) { |
| LOG.warn("Failed to stop the ConnectionManager: " + e.getMessage()); |
| } |
| } |
| |
| /** |
| * @return Connection manager's {@link #server}. |
| */ |
| @TestOnly |
| public NettyServer server() { |
| return server; |
| } |
| |
| /** |
| * @return This node's consistent id. |
| */ |
| @TestOnly |
| public String consistentId() { |
| return consistentId; |
| } |
| |
| /** |
| * @return Collection of all the clients started by this connection manager. |
| */ |
| @TestOnly |
| public Collection<NettyClient> clients() { |
| return Collections.unmodifiableCollection(clients.values()); |
| } |
| |
| |
| /** |
| * @return Map of the channels. |
| */ |
| @TestOnly |
| public Map<String, NettySender> channels() { |
| return Collections.unmodifiableMap(channels); |
| } |
| |
| /** |
| * Creates a {@link Bootstrap} for clients, providing channel handlers and options. |
| * |
| * @param eventLoopGroup Event loop group for channel handling. |
| * @param serializationRegistry Serialization registry. |
| * @return Bootstrap for clients. |
| */ |
| public static Bootstrap createClientBootstrap( |
| EventLoopGroup eventLoopGroup, |
| MessageSerializationRegistry serializationRegistry |
| ) { |
| Bootstrap clientBootstrap = new Bootstrap(); |
| |
| clientBootstrap.group(eventLoopGroup) |
| .channel(NioSocketChannel.class) |
| // See NettyServer#start for netty configuration details. |
| .option(ChannelOption.SO_KEEPALIVE, true) |
| .option(ChannelOption.SO_LINGER, 0) |
| .option(ChannelOption.TCP_NODELAY, true); |
| |
| return clientBootstrap; |
| } |
| } |