IGNITE-15309 Fix nodes hanging on stop (#280)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index f641f2b..72d9b9c 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -35,7 +35,6 @@
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
@@ -133,8 +132,7 @@
         return Mono.defer(() -> {
             LOG.info("Stopping {}", address);
 
-            // Fail all incoming message listeners on stop
-            sink.error(new NodeStoppingException());
+            sink.complete();
 
             LOG.info("Stopped {}", address);
             return Mono.empty();
@@ -155,7 +153,10 @@
 
     /** {@inheritDoc} */
     @Override public Mono<Void> stop() {
-        return doStop();
+        return Mono.defer(() -> {
+            stop.onComplete();
+            return onStop;
+        });
     }
 
     /** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index e325004..17f9122 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -31,6 +31,9 @@
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkMessageHandler;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
 /**
  * Implementation of {@link MessagingService} based on ScaleCube.
  */
@@ -77,7 +80,7 @@
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
         if (cluster.isShutdown())
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
 
         return cluster
             .send(fromNetworkAddress(recipient.address()), Message.fromData(msg))
@@ -94,7 +97,7 @@
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
         if (cluster.isShutdown())
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
 
         var message = Message
             .withData(msg)
@@ -116,7 +119,7 @@
         // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
         if (cluster.isShutdown())
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
 
         var message = Message
             .withData(msg)
@@ -127,6 +130,7 @@
             .requestResponse(fromNetworkAddress(addr), message)
             .timeout(Duration.ofMillis(timeout))
             .toFuture()
+            .thenCompose(m -> m == null ? failedFuture(new NodeStoppingException()) : completedFuture(m))
             .thenApply(Message::data);
     }