NIFI-8789 Corrected TestListenUDP to use getAvailableUdpPort()
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5219.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index bbbce1f..3fcfb4c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -31,17 +31,22 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+@RunWith(MockitoJUnitRunner.class)
public class TestListenUDP {
private static final String LOCALHOST = "localhost";
@@ -50,10 +55,13 @@
private TestRunner runner;
+ @Mock
+ private ChannelResponder<DatagramChannel> responder;
+
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(ListenUDP.class);
- port = NetworkUtils.availablePort();
+ port = NetworkUtils.getAvailableUdpPort();
runner.setProperty(ListenUDP.PORT, Integer.toString(port));
}
@@ -62,7 +70,7 @@
runner.setProperty(ListenUDP.PORT, "1");
runner.assertValid();
- runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
+ runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
runner.assertNotValid();
runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234");
@@ -75,12 +83,9 @@
@Test
public void testDefaultBehavior() throws IOException, InterruptedException {
final List<String> messages = getMessages(15);
- final int expectedQueued = messages.size();
final int expectedTransferred = messages.size();
- // default behavior should produce a FlowFile per message sent
-
- run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
+ run(new DatagramSocket(), messages, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -95,7 +100,7 @@
final List<String> messages = getMessages(20);
- run(new DatagramSocket(), messages, maxQueueSize, maxQueueSize);
+ run(new DatagramSocket(), messages, maxQueueSize);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -110,10 +115,9 @@
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3");
final List<String> messages = getMessages(5);
- final int expectedQueued = messages.size();
final int expectedTransferred = 2;
- run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
+ run(new DatagramSocket(), messages, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, expectedTransferred);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -131,55 +135,52 @@
public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1";
final String sender2 = "sender2";
- final ChannelResponder responder = Mockito.mock(ChannelResponder.class);
final byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
- final List<StandardEvent> mockEvents = new ArrayList<>();
- mockEvents.add(new StandardEvent(sender1, message, responder));
- mockEvents.add(new StandardEvent(sender1, message, responder));
- mockEvents.add(new StandardEvent(sender2, message, responder));
- mockEvents.add(new StandardEvent(sender2, message, responder));
+ final List<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<>();
+ mockEvents.add(new StandardEvent<>(sender1, message, responder));
+ mockEvents.add(new StandardEvent<>(sender1, message, responder));
+ mockEvents.add(new StandardEvent<>(sender2, message, responder));
+ mockEvents.add(new StandardEvent<>(sender2, message, responder));
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
- runner.setProperty(ListenRELP.PORT, "1");
- runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+ runner.setProperty(ListenUDP.PORT, "1");
+ runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
// sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
runner.run();
- runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+ runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
verifyProvenance(2);
}
@Test
public void testRunWhenNoEventsAvailable() {
- final List<StandardEvent> mockEvents = new ArrayList<>();
+ final List<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<>();
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
- runner.setProperty(ListenRELP.PORT, "1");
- runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+ runner.setProperty(ListenUDP.PORT, "1");
+ runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
runner.run(5);
- runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
+ runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
}
@Test
public void testWithSendingHostAndPortSameAsSender() throws IOException, InterruptedException {
- final String sendingHost = "localhost";
- final Integer sendingPort = 21001;
- runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
+ final Integer sendingPort = NetworkUtils.getAvailableUdpPort();
+ runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
// bind to the same sending port that processor has for Sending Host Port
final DatagramSocket socket = new DatagramSocket(sendingPort);
final List<String> messages = getMessages(6);
- final int expectedQueued = messages.size();
final int expectedTransferred = messages.size();
- run(socket, messages, expectedQueued, expectedTransferred);
+ run(socket, messages, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -214,11 +215,8 @@
}
}
- protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred)
+ protected void run(final DatagramSocket socket, final List<String> messages, final int expectedTransferred)
throws IOException, InterruptedException {
-
-
-
// Run Processor and start Dispatcher without shutting down
runner.run(1, false, true);
@@ -242,9 +240,9 @@
// Extend ListenUDP to mock the ChannelDispatcher and allow us to return staged events
private static class MockListenUDP extends ListenUDP {
- private List<StandardEvent> mockEvents;
+ private final List<StandardEvent<DatagramChannel>> mockEvents;
- public MockListenUDP(List<StandardEvent> mockEvents) {
+ public MockListenUDP(List<StandardEvent<DatagramChannel>> mockEvents) {
this.mockEvents = mockEvents;
}
@@ -259,7 +257,5 @@
protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) {
return Mockito.mock(ChannelDispatcher.class);
}
-
}
-
}