IGNITE-15309 Fix nodes hanging on stop (#280)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index f641f2b..72d9b9c 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -35,7 +35,6 @@
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
@@ -133,8 +132,7 @@
return Mono.defer(() -> {
LOG.info("Stopping {}", address);
- // Fail all incoming message listeners on stop
- sink.error(new NodeStoppingException());
+ sink.complete();
LOG.info("Stopped {}", address);
return Mono.empty();
@@ -155,7 +153,10 @@
/** {@inheritDoc} */
@Override public Mono<Void> stop() {
- return doStop();
+ return Mono.defer(() -> {
+ stop.onComplete();
+ return onStop;
+ });
}
/** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index e325004..17f9122 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -31,6 +31,9 @@
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
/**
* Implementation of {@link MessagingService} based on ScaleCube.
*/
@@ -77,7 +80,7 @@
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
if (cluster.isShutdown())
- return CompletableFuture.failedFuture(new NodeStoppingException());
+ return failedFuture(new NodeStoppingException());
return cluster
.send(fromNetworkAddress(recipient.address()), Message.fromData(msg))
@@ -94,7 +97,7 @@
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
if (cluster.isShutdown())
- return CompletableFuture.failedFuture(new NodeStoppingException());
+ return failedFuture(new NodeStoppingException());
var message = Message
.withData(msg)
@@ -116,7 +119,7 @@
// TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
// TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
if (cluster.isShutdown())
- return CompletableFuture.failedFuture(new NodeStoppingException());
+ return failedFuture(new NodeStoppingException());
var message = Message
.withData(msg)
@@ -127,6 +130,7 @@
.requestResponse(fromNetworkAddress(addr), message)
.timeout(Duration.ofMillis(timeout))
.toFuture()
+ .thenCompose(m -> m == null ? failedFuture(new NodeStoppingException()) : completedFuture(m))
.thenApply(Message::data);
}