IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)

diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 1cbd010..2c6acfc 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -34,9 +34,10 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 import org.jetbrains.annotations.NotNull;
 import org.mockito.Mockito;
@@ -71,11 +72,14 @@
     /** */
     public static final int MESSAGES_PER_REQUEST = 10;
 
+    /** Time to wait for the message in milliseconds. */
+    private static final long MSG_WAIT_TIMEOUT = 1_000L;
+
     /** */
     private final Map<String, Publisher> publishers = new HashMap<>();
 
     /** */
-    private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
+    private final BlockingDeque<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
 
     public SubscriberStubSettings createSubscriberStub() throws IOException {
         CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
@@ -136,8 +140,18 @@
     private void pullMessages(ClientCall.Listener<PullResponse> listener, Metadata metadata) {
         PullResponse.Builder pullResponse = PullResponse.newBuilder();
 
-        for(int i = 0; i < MESSAGES_PER_REQUEST; i++) {
-            pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(blockingQueue.remove()).build());
+        try {
+            for (int i = 0; i < MESSAGES_PER_REQUEST; i++) {
+                PubsubMessage msg = blockingQueue.poll(MSG_WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
+
+                if (msg == null)
+                    break;
+
+                pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(msg).build());
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         }
 
         listener.onMessage(pullResponse.build());
diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
index 300efe3..b286b94 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
@@ -180,9 +180,6 @@
 
             pubSubStmr.setSingleTupleExtractor(singleTupleExtractor());
 
-            // Start Pub/Sub streamer.
-            pubSubStmr.start();
-
             final CountDownLatch latch = new CountDownLatch(CNT);
 
             IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@@ -210,6 +207,9 @@
 
             ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
 
+            // Start Pub/Sub streamer.
+            pubSubStmr.start();
+
             // Checks all events successfully processed in 10 seconds.
             assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
                 latch.await(10, TimeUnit.SECONDS));