NIFI-8789 Corrected TestListenUDP to use getAvailableUdpPort()

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5219.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index bbbce1f..3fcfb4c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -31,17 +31,22 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
+@RunWith(MockitoJUnitRunner.class)
 public class TestListenUDP {
 
     private static final String LOCALHOST = "localhost";
@@ -50,10 +55,13 @@
 
     private TestRunner runner;
 
+    @Mock
+    private ChannelResponder<DatagramChannel> responder;
+
     @Before
     public void setUp() throws Exception {
         runner = TestRunners.newTestRunner(ListenUDP.class);
-        port = NetworkUtils.availablePort();
+        port = NetworkUtils.getAvailableUdpPort();
         runner.setProperty(ListenUDP.PORT, Integer.toString(port));
     }
 
@@ -62,7 +70,7 @@
         runner.setProperty(ListenUDP.PORT, "1");
         runner.assertValid();
 
-        runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
+        runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
         runner.assertNotValid();
 
         runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234");
@@ -75,12 +83,9 @@
     @Test
     public void testDefaultBehavior() throws IOException, InterruptedException {
         final List<String> messages = getMessages(15);
-        final int expectedQueued = messages.size();
         final int expectedTransferred = messages.size();
 
-        // default behavior should produce a FlowFile per message sent
-
-        run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
+        run(new DatagramSocket(), messages, expectedTransferred);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -95,7 +100,7 @@
 
         final List<String> messages = getMessages(20);
 
-        run(new DatagramSocket(), messages, maxQueueSize, maxQueueSize);
+        run(new DatagramSocket(), messages, maxQueueSize);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -110,10 +115,9 @@
         runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3");
 
         final List<String> messages = getMessages(5);
-        final int expectedQueued = messages.size();
         final int expectedTransferred = 2;
 
-        run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
+        run(new DatagramSocket(), messages, expectedTransferred);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, expectedTransferred);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -131,55 +135,52 @@
     public void testBatchingWithDifferentSenders() {
         final String sender1 = "sender1";
         final String sender2 = "sender2";
-        final ChannelResponder responder = Mockito.mock(ChannelResponder.class);
         final byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
 
-        final List<StandardEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new StandardEvent(sender1, message, responder));
-        mockEvents.add(new StandardEvent(sender1, message, responder));
-        mockEvents.add(new StandardEvent(sender2, message, responder));
-        mockEvents.add(new StandardEvent(sender2, message, responder));
+        final List<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<>();
+        mockEvents.add(new StandardEvent<>(sender1, message, responder));
+        mockEvents.add(new StandardEvent<>(sender1, message, responder));
+        mockEvents.add(new StandardEvent<>(sender2, message, responder));
+        mockEvents.add(new StandardEvent<>(sender2, message, responder));
 
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenUDP);
-        runner.setProperty(ListenRELP.PORT, "1");
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(ListenUDP.PORT, "1");
+        runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
 
         // sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
 
         verifyProvenance(2);
     }
 
     @Test
     public void testRunWhenNoEventsAvailable() {
-        final List<StandardEvent> mockEvents = new ArrayList<>();
+        final List<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<>();
 
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenUDP);
-        runner.setProperty(ListenRELP.PORT, "1");
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(ListenUDP.PORT, "1");
+        runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
 
         runner.run(5);
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
+        runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
     }
 
     @Test
     public void testWithSendingHostAndPortSameAsSender() throws IOException, InterruptedException {
-        final String sendingHost = "localhost";
-        final Integer sendingPort = 21001;
-        runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
+        final Integer sendingPort = NetworkUtils.getAvailableUdpPort();
+        runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
         runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
 
         // bind to the same sending port that processor has for Sending Host Port
         final DatagramSocket socket = new DatagramSocket(sendingPort);
 
         final List<String> messages = getMessages(6);
-        final int expectedQueued = messages.size();
         final int expectedTransferred = messages.size();
 
-        run(socket, messages, expectedQueued, expectedTransferred);
+        run(socket, messages, expectedTransferred);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -214,11 +215,8 @@
         }
     }
 
-    protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred)
+    protected void run(final DatagramSocket socket, final List<String> messages, final int expectedTransferred)
             throws IOException, InterruptedException {
-
-
-
         // Run Processor and start Dispatcher without shutting down
         runner.run(1, false, true);
 
@@ -242,9 +240,9 @@
     // Extend ListenUDP to mock the ChannelDispatcher and allow us to return staged events
     private static class MockListenUDP extends ListenUDP {
 
-        private List<StandardEvent> mockEvents;
+        private final List<StandardEvent<DatagramChannel>> mockEvents;
 
-        public MockListenUDP(List<StandardEvent> mockEvents) {
+        public MockListenUDP(List<StandardEvent<DatagramChannel>> mockEvents) {
             this.mockEvents = mockEvents;
         }
 
@@ -259,7 +257,5 @@
         protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) {
             return Mockito.mock(ChannelDispatcher.class);
         }
-
     }
-
 }