IGNITE-14009 Fixes flaky PubSubStreamerSelfTest. (#38)
diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 1cbd010..2c6acfc 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -34,9 +34,10 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
@@ -71,11 +72,14 @@
/** */
public static final int MESSAGES_PER_REQUEST = 10;
+ /** Time to wait for the message in milliseconds. */
+ private static final long MSG_WAIT_TIMEOUT = 1_000L;
+
/** */
private final Map<String, Publisher> publishers = new HashMap<>();
/** */
- private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
+ private final BlockingDeque<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
public SubscriberStubSettings createSubscriberStub() throws IOException {
CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
@@ -136,8 +140,18 @@
private void pullMessages(ClientCall.Listener<PullResponse> listener, Metadata metadata) {
PullResponse.Builder pullResponse = PullResponse.newBuilder();
- for(int i = 0; i < MESSAGES_PER_REQUEST; i++) {
- pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(blockingQueue.remove()).build());
+ try {
+ for (int i = 0; i < MESSAGES_PER_REQUEST; i++) {
+ PubsubMessage msg = blockingQueue.poll(MSG_WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ if (msg == null)
+ break;
+
+ pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(msg).build());
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
listener.onMessage(pullResponse.build());
diff --git a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
index 300efe3..b286b94 100644
--- a/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
+++ b/modules/pub-sub-ext/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java
@@ -180,9 +180,6 @@
pubSubStmr.setSingleTupleExtractor(singleTupleExtractor());
- // Start Pub/Sub streamer.
- pubSubStmr.start();
-
final CountDownLatch latch = new CountDownLatch(CNT);
IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@@ -210,6 +207,9 @@
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+ // Start Pub/Sub streamer.
+ pubSubStmr.start();
+
// Checks all events successfully processed in 10 seconds.
assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
latch.await(10, TimeUnit.SECONDS));