SLING-8554 - Add second message to test while loop in poller
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index 431f692..ae28d4c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -62,34 +62,19 @@
public void testSendReceive() throws Exception {
HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
Closeable poller = provider.createPoller(topicName, Reset.earliest, handler);
- DiscoveryMessage msg = DiscoveryMessage.newBuilder()
- .setSubAgentName("sub1agent")
- .setSubSlingId("subsling")
- .setSubscriberConfiguration(SubscriberConfiguration
- .newBuilder()
- .setEditable(false)
- .setMaxRetries(-1)
- .build())
- .build();
MessageSender<DiscoveryMessage> messageSender = provider.createSender();
- messageSender.send(topicName, msg);
+ messageSender.send(topicName, createMessage());
assertReceived("Consumer started from earliest .. should see our message");
+ messageSender.send(topicName, createMessage());
+ assertReceived("Should also consume a second message");
poller.close();
}
@Test
public void testAssign() throws Exception {
- DiscoveryMessage msg = DiscoveryMessage.newBuilder()
- .setSubAgentName("sub1agent")
- .setSubSlingId("subsling")
- .setSubscriberConfiguration(SubscriberConfiguration
- .newBuilder()
- .setEditable(false)
- .setMaxRetries(-1)
- .build())
- .build();
+ DiscoveryMessage msg = createMessage();
MessageSender<DiscoveryMessage> messageSender = provider.createSender();
messageSender.send(topicName, msg);
@@ -116,6 +101,18 @@
}
}
+ private DiscoveryMessage createMessage() {
+ return DiscoveryMessage.newBuilder()
+ .setSubAgentName("sub1agent")
+ .setSubSlingId("subsling")
+ .setSubscriberConfiguration(SubscriberConfiguration
+ .newBuilder()
+ .setEditable(false)
+ .setMaxRetries(-1)
+ .build())
+ .build();
+ }
+
private void assertReceived(String message) throws InterruptedException {
assertTrue(message, sem.tryAcquire(30, TimeUnit.SECONDS));
}