blob: 8978c9143bf179110721a54e38335385295edefc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.netty;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Network.PORT_IN_USE_ERR;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.ssl.SslContextProvider;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Netty server channel wrapper.
*/
public class NettyServer {
/** A lock for start and stop operations. */
private final Object startStopLock = new Object();
/** Bootstrap factory. */
private final NettyBootstrapFactory bootstrapFactory;
/** Server socket configuration. */
private final NetworkView configuration;
/** Serialization service. */
private final SerializationService serializationService;
/** Incoming message listener. */
private final Consumer<InNetworkObject> messageListener;
/** Handshake manager. */
private final Supplier<HandshakeManager> handshakeManager;
/** Server start future. */
private CompletableFuture<Void> serverStartFuture;
/** Server socket channel. */
@Nullable
private volatile ServerChannel channel;
/** Server close future. */
@Nullable
private CompletableFuture<Void> serverCloseFuture;
/** Flag indicating if {@link #stop()} has been called. */
private boolean stopped;
/**
* Constructor.
*
* @param configuration Server configuration.
* @param handshakeManager Handshake manager supplier.
* @param messageListener Message listener.
* @param serializationService Serialization service.
* @param bootstrapFactory Netty bootstrap factory.
*/
public NettyServer(
NetworkView configuration,
Supplier<HandshakeManager> handshakeManager,
Consumer<InNetworkObject> messageListener,
SerializationService serializationService,
NettyBootstrapFactory bootstrapFactory
) {
this.configuration = configuration;
this.handshakeManager = handshakeManager;
this.messageListener = messageListener;
this.serializationService = serializationService;
this.bootstrapFactory = bootstrapFactory;
}
/**
* Starts the server.
*
* @return Future that resolves when the server is successfully started.
*/
public CompletableFuture<Void> start() {
synchronized (startStopLock) {
if (stopped) {
throw new IgniteInternalException("Attempted to start an already stopped server");
}
if (serverStartFuture != null) {
throw new IgniteInternalException("Attempted to start an already started server");
}
ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
/** {@inheritDoc} */
@Override
public void initChannel(SocketChannel ch) {
var sessionSerializationService = new PerSessionSerializationService(serializationService);
// Get handshake manager for the new channel.
HandshakeManager manager = handshakeManager.get();
if (configuration.ssl().enabled()) {
SslContext sslContext = SslContextProvider.createServerSslContext(configuration.ssl());
PipelineUtils.setup(ch.pipeline(), sessionSerializationService, manager, messageListener, sslContext);
} else {
PipelineUtils.setup(ch.pipeline(), sessionSerializationService, manager, messageListener);
}
}
});
int port = configuration.port();
String address = configuration.listenAddress();
var bindFuture = new CompletableFuture<Channel>();
ChannelFuture channelFuture = address.isEmpty() ? bootstrap.bind(port) : bootstrap.bind(address, port);
channelFuture.addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
bindFuture.complete(future.channel());
} else if (future.isCancelled()) {
bindFuture.cancel(true);
} else {
String errorMessage = address.isEmpty()
? "Port " + port + " is not available."
: String.format("Address %s:%d is not available", address, port);
bindFuture.completeExceptionally(new IgniteException(PORT_IN_USE_ERR, errorMessage, future.cause()));
}
});
serverStartFuture = bindFuture
.handle((channel, err) -> {
synchronized (startStopLock) {
if (channel != null) {
serverCloseFuture = NettyUtils.toCompletableFuture(channel.closeFuture());
}
this.channel = (ServerChannel) channel;
if (err != null || stopped) {
Throwable stopErr = err != null ? err : new CancellationException("Server was stopped");
return CompletableFuture.<Void>failedFuture(stopErr);
} else {
return CompletableFutures.<Void>nullCompletedFuture();
}
}
})
.thenCompose(Function.identity());
return serverStartFuture;
}
}
/**
* Returns gets the local address of the server.
*
* @return Gets the local address of the server.
*/
public SocketAddress address() {
return Objects.requireNonNull(channel, "Not started yet").localAddress();
}
/**
* Stops the server.
*
* @return Future that is resolved when the server's channel has closed or an already completed future for a subsequent call.
*/
public CompletableFuture<Void> stop() {
synchronized (startStopLock) {
if (stopped) {
return nullCompletedFuture();
}
stopped = true;
if (serverStartFuture == null) {
return nullCompletedFuture();
}
var serverCloseFuture0 = serverCloseFuture;
return serverStartFuture.handle((unused, throwable) -> {
if (channel != null) {
channel.close();
}
return serverCloseFuture0 == null ? CompletableFutures.<Void>nullCompletedFuture() : serverCloseFuture0;
}).thenCompose(Function.identity());
}
}
/**
* Returns {@code true} if the server is running, {@code false} otherwise.
*
* @return {@code true} if the server is running, {@code false} otherwise.
*/
@TestOnly
public boolean isRunning() {
var channel0 = channel;
return channel0 != null && channel0.isOpen();
}
}