Multiple unit tests improvements (#5439)

* Multiple unit tests improvements

* Fixed Storm integration tests

* Fixed StringSchema static initialization

* Peg number of forks to number of cores

* Updated to 4 forks

* Fixed resourcel leak in managed ledger tests

* Use different temp folders for PulsarFunctionState test

* Increase retries count for PulsarFunctionE2ESecurityTest

* Fixed Flume connector test

* Fixed race conditions in primitive schema types static initialization

* Improve port manager

* Removed PulsarFunctionStateTest to move to integration tests

* Fixed dangling class reference
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
index 5934151..8b23b84 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
@@ -25,7 +25,6 @@
 import java.nio.file.Files;
 import java.nio.file.LinkOption;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileAttribute;
@@ -46,50 +45,43 @@
 
 public abstract class AbstractFileTests {
 
-    public static final String TMP_DIR = "/tmp/foo";
-    
     protected BlockingQueue<File> workQueue;
     protected BlockingQueue<File> inProcess;
     protected BlockingQueue<File> recentlyProcessed;
     protected BlockingQueue<File> producedFiles;
-    
-    protected TestFileGenerator generatorThread; 
+
+    protected TestFileGenerator generatorThread;
     protected FileListingThread listingThread;
     protected ExecutorService executor;
-    
+
+    protected Path directory;
+
     @BeforeMethod
     public void init() throws IOException {
-        
         // Create the directory we are going to read from
-        Path directory = Paths.get(TMP_DIR);
-        
-        if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
-            Files.createDirectory(directory, getPermissions());
-        }
-        
+        directory = Files.createTempDirectory("pulsar-io-file-tests", getPermissions());
+
         workQueue = Mockito.spy(new LinkedBlockingQueue<>());
-        inProcess = Mockito.spy(new LinkedBlockingQueue<>());         
+        inProcess = Mockito.spy(new LinkedBlockingQueue<>());
         recentlyProcessed = Mockito.spy(new LinkedBlockingQueue<>());
         producedFiles = Mockito.spy(new LinkedBlockingQueue<>());
         executor = Executors.newFixedThreadPool(10);
     }
-    
+
     @AfterMethod
     public void tearDown() throws Exception {
         // Shutdown all of the processing threads
         stopThreads();
-        
+
         // Delete the directory and all the files
         cleanUp();
     }
-    
-    protected static final void cleanUp() throws IOException {
-        Path directory = Paths.get(TMP_DIR);
-        
+
+    protected final void cleanUp() throws IOException {
         if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
             return;
         }
-        
+
         Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
@@ -104,35 +96,35 @@
            }
         });
     }
-    
+
     protected void stopThreads() throws Exception {
         executor.shutdown();
         try {
             if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
                 executor.shutdownNow();
-            } 
+            }
         } catch (InterruptedException e) {
             executor.shutdownNow();
         }
     }
-    
+
     protected final void generateFiles(int numFiles) throws IOException, InterruptedException, ExecutionException {
-        generateFiles(numFiles, 1, TMP_DIR);
+        generateFiles(numFiles, 1, directory.toString());
     }
-    
+
     protected final void generateFiles(int numFiles, int numLines) throws IOException, InterruptedException, ExecutionException {
-        generateFiles(numFiles, numLines, TMP_DIR);
+        generateFiles(numFiles, numLines, directory.toString());
     }
-    
+
     protected final void generateFiles(int numFiles, int numLines, String directory) throws IOException, InterruptedException, ExecutionException {
         generatorThread = new TestFileGenerator(producedFiles, numFiles, 1, numLines, directory, "prefix", ".txt", getPermissions());
         Future<?> f = executor.submit(generatorThread);
         f.get();
     }
-   
+
     protected static final FileAttribute<Set<PosixFilePermission>> getPermissions() {
         Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx");
         return PosixFilePermissions.asFileAttribute(perms);
     }
-    
+
 }
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
index a4582bc..30284a5 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -37,19 +37,19 @@
 
 @SuppressWarnings("unchecked")
 public class FileConsumerThreadTests extends AbstractFileTests {
-    
+
     private PushSource<byte[]> consumer;
     private FileConsumerThread consumerThread;
 
     @Test
     public final void singleFileTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
-        
+        map.put("inputDirectory", directory.toString());
+
         try {
             generateFiles(1);
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -57,14 +57,14 @@
             executor.execute(listingThread);
             executor.execute(consumerThread);
             Thread.sleep(2000);
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
                 verify(inProcess, times(1)).add(produced);
                 verify(inProcess, times(1)).remove(produced);
                 verify(recentlyProcessed, times(1)).add(produced);
             }
-            
+
             verify(workQueue, times(1)).offer(any(File.class));
             verify(workQueue, atLeast(1)).take();
             verify(inProcess, times(1)).add(any(File.class));
@@ -73,18 +73,18 @@
             verify(consumer, times(1)).consume((Record<byte[]>) any(Record.class));
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void mulitpleFileTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
-        
+        map.put("inputDirectory", directory.toString());
+
         try {
             generateFiles(50, 2);
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -92,14 +92,14 @@
             executor.execute(listingThread);
             executor.execute(consumerThread);
             Thread.sleep(2000);
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
                 verify(inProcess, times(1)).add(produced);
                 verify(inProcess, times(1)).remove(produced);
                 verify(recentlyProcessed, times(1)).add(produced);
             }
-            
+
             verify(workQueue, times(50)).offer(any(File.class));
             verify(workQueue, atLeast(50)).take();
             verify(inProcess, times(50)).add(any(File.class));
@@ -108,18 +108,18 @@
             verify(consumer, times(100)).consume((Record<byte[]>) any(Record.class));
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void multiLineFileTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
-        
+        map.put("inputDirectory", directory.toString());
+
         try {
             generateFiles(1, 10);
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -127,14 +127,14 @@
             executor.execute(listingThread);
             executor.execute(consumerThread);
             Thread.sleep(2000);
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
                 verify(inProcess, times(1)).add(produced);
                 verify(inProcess, times(1)).remove(produced);
                 verify(recentlyProcessed, times(1)).add(produced);
             }
-            
+
             verify(workQueue, times(1)).offer(any(File.class));
             verify(workQueue, atLeast(1)).take();
             verify(inProcess, times(1)).add(any(File.class));
@@ -143,6 +143,6 @@
             verify(consumer, times(10)).consume((Record<byte[]>) any(Record.class));
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
 }
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
index 5be101f..01e5143 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -33,13 +33,13 @@
 
 
 public class FileListingThreadTests extends AbstractFileTests {
-     
+
     @Test
-    public final void singleFileTest() throws IOException {  
-        
+    public final void singleFileTest() throws IOException {
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
-      
+        map.put("inputDirectory", directory.toString());
+
         try {
             generateFiles(1);
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -47,44 +47,44 @@
             Thread.sleep(2000);
             verify(producedFiles, times(1)).put(any(File.class));
             verify(workQueue, times(1)).offer(any(File.class));
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
             }
-            
+
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void fiftyFileTest() throws IOException {
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
-        
+        map.put("inputDirectory", directory.toString());
+
         try {
             generateFiles(50);
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
             executor.execute(listingThread);
             Thread.sleep(2000);
             verify(workQueue, times(50)).offer(any(File.class));
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
             }
-            
+
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void minimumSizeTest() throws IOException {
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
-        
+        map.put("inputDirectory", directory.toString());
+
         try {
             // Create 50 zero size files
             generateFiles(50, 0);
@@ -94,16 +94,16 @@
             verify(workQueue, times(0)).offer(any(File.class));
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void maximumSizeTest() throws IOException {
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("maximumSize", "1000");
-        
+
         try {
             // Create 5 files that exceed the limit and 45 that don't
             generateFiles(5, 1000);
@@ -118,14 +118,14 @@
             cleanUp();
         }
     }
-    
+
     @Test
     public final void minimumAgeTest() throws IOException {
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("minimumFileAge", "5000");
-        
+
         try {
             // Create 5 files that will be too "new" for processing
             generateFiles(5);
@@ -139,19 +139,19 @@
             cleanUp();
         }
     }
-    
+
     @Test
     public final void maximumAgeTest() throws IOException {
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("maximumFileAge", "5000");
-        
+
         try {
             // Create 5 files that will be processed
             generateFiles(5);
             Thread.sleep(5000);
-            
+
             // Create 5 files that will be too "old" for processing
             generateFiles(5);
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -164,20 +164,20 @@
             cleanUp();
         }
     }
-    
+
     @Test
     public final void doRecurseTest() throws IOException {
-       
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("recurse", Boolean.TRUE);
-        
+
         try {
             // Create 5 files in the root folder
             generateFiles(5);
-            
+
             // Create 5 files in a sub-folder
-            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+            generateFiles(5, 1, directory.toString() + File.separator + "sub-dir");
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
             executor.execute(listingThread);
             Thread.sleep(2000);
@@ -186,22 +186,22 @@
             fail("Unable to generate files" + e.getLocalizedMessage());
         } finally {
             cleanUp();
-        }       
+        }
     }
-    
+
     @Test
     public final void doNotRecurseTest() throws IOException {
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("recurse", Boolean.FALSE);
-        
+
         try {
             // Create 5 files in the root folder
             generateFiles(5);
-            
+
             // Create 5 files in a sub-folder
-            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+            generateFiles(5, 1, directory.toString() + File.separator + "sub-dir");
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
             executor.execute(listingThread);
             Thread.sleep(2000);
@@ -210,21 +210,21 @@
             fail("Unable to generate files" + e.getLocalizedMessage());
         } finally {
             cleanUp();
-        }    
+        }
     }
-    
+
     @Test
     public final void pathFilterTest() throws IOException {
-         
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("recurse", Boolean.TRUE);
         map.put("pathFilter", "sub-.*");
-        
+
         try {
             // Create 5 files in a sub-folder
-            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir-a");
-            generateFiles(5, 1, TMP_DIR + File.separator + "dir-b");
+            generateFiles(5, 1, directory.toString() + File.separator + "sub-dir-a");
+            generateFiles(5, 1, directory.toString() + File.separator + "dir-b");
             listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
             executor.execute(listingThread);
             Thread.sleep(2000);
@@ -233,6 +233,6 @@
             fail("Unable to generate files" + e.getLocalizedMessage());
         } finally {
             cleanUp();
-        }       
+        }
     }
 }
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
index 9970a55..f37737c 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -45,14 +45,14 @@
 
     @Test
     public final void singleFileTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-        
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("keepFile", Boolean.FALSE);
-        
+
         try {
             generateFiles(1);
             fileConfig = FileSourceConfig.load(map);
@@ -63,35 +63,35 @@
             executor.execute(consumerThread);
             executor.execute(cleanupThread);
             Thread.sleep(2000);
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
                 verify(inProcess, times(1)).add(produced);
                 verify(inProcess, times(1)).remove(produced);
                 verify(recentlyProcessed, times(1)).add(produced);
             }
-            
+
             verify(workQueue, times(1)).offer(any(File.class));
             verify(workQueue, atLeast(1)).take();
             verify(inProcess, times(1)).add(any(File.class));
             verify(inProcess, times(1)).remove(any(File.class));
             verify(recentlyProcessed, times(1)).add(any(File.class));
-            verify(recentlyProcessed, times(2)).take(); 
+            verify(recentlyProcessed, times(2)).take();
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void mulitpleFileTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-       
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("keepFile", Boolean.FALSE);
-        
+
         try {
             generateFiles(50);
             fileConfig = FileSourceConfig.load(map);
@@ -102,37 +102,37 @@
             executor.execute(consumerThread);
             executor.execute(cleanupThread);
             Thread.sleep(2000);
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
                 verify(inProcess, times(1)).add(produced);
                 verify(inProcess, times(1)).remove(produced);
                 verify(recentlyProcessed, times(1)).add(produced);
             }
-            
+
             verify(workQueue, times(50)).offer(any(File.class));
             verify(workQueue, atLeast(50)).take();
             verify(inProcess, times(50)).add(any(File.class));
             verify(inProcess, times(50)).remove(any(File.class));
             verify(recentlyProcessed, times(50)).add(any(File.class));
             verify(recentlyProcessed, times(50)).add(any(File.class));
-            verify(recentlyProcessed, times(51)).take(); 
+            verify(recentlyProcessed, times(51)).take();
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void keepFileTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-       
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("keepFile", Boolean.TRUE);
         map.put("pollingInterval", 1000L);
-        
+
         try {
             generateFiles(1);
             fileConfig = FileSourceConfig.load(map);
@@ -143,55 +143,55 @@
             executor.execute(consumerThread);
             executor.execute(cleanupThread);
             Thread.sleep(7900);  // Should pull the same file 5 times?
-            
+
             for (File produced : producedFiles) {
                 verify(workQueue, atLeast(4)).offer(produced);
                 verify(inProcess, atLeast(4)).add(produced);
                 verify(inProcess, atLeast(4)).remove(produced);
                 verify(recentlyProcessed, atLeast(4)).add(produced);
             }
-            
-            verify(recentlyProcessed, atLeast(5)).take(); 
+
+            verify(recentlyProcessed, atLeast(5)).take();
         } catch (InterruptedException | ExecutionException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void continuousRunTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-       
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("keepFile", Boolean.FALSE);
         map.put("pollingInterval", 100);
         fileConfig = FileSourceConfig.load(map);
-        
+
         try {
             // Start producing files, with a .1 sec delay between
-            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions());
+            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, directory.toString(), "continuous", ".txt", getPermissions());
             executor.execute(generatorThread);
-            
+
             listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
             consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
             cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
             executor.execute(listingThread);
             executor.execute(consumerThread);
             executor.execute(cleanupThread);
-            
+
             // Run for 30 seconds
             Thread.sleep(30000);
-            
+
             // Stop producing files
             generatorThread.halt();
-            
+
             // Let the consumer catch up
             while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) {
                 Thread.sleep(2000);
             }
-            
+
             // Make sure every single file was processed.
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
@@ -199,29 +199,29 @@
                 verify(inProcess, times(1)).remove(produced);
                 verify(recentlyProcessed, times(1)).add(produced);
             }
-            
+
         } catch (InterruptedException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
-    
+
     @Test
     public final void multipleConsumerTest() throws IOException {
-        
+
         consumer = Mockito.mock(PushSource.class);
         Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-       
+
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", TMP_DIR);
+        map.put("inputDirectory", directory.toString());
         map.put("keepFile", Boolean.FALSE);
         map.put("pollingInterval", 100);
         fileConfig = FileSourceConfig.load(map);
-        
+
         try {
             // Start producing files, with a .1 sec delay between
-            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions());
+            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, directory.toString(), "continuous", ".txt", getPermissions());
             executor.execute(generatorThread);
-            
+
             listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
             consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
             FileConsumerThread consumerThread2 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
@@ -232,18 +232,18 @@
             executor.execute(consumerThread2);
             executor.execute(consumerThread3);
             executor.execute(cleanupThread);
-            
+
             // Run for 30 seconds
             Thread.sleep(30000);
-            
+
             // Stop producing files
             generatorThread.halt();
-            
+
             // Let the consumer catch up
             while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) {
                 Thread.sleep(2000);
             }
-            
+
             // Make sure every single file was processed exactly once.
             for (File produced : producedFiles) {
                 verify(workQueue, times(1)).offer(produced);
@@ -251,9 +251,9 @@
                 verify(inProcess, times(1)).remove(produced);
                 verify(recentlyProcessed, times(1)).add(produced);
             }
-            
+
         } catch (InterruptedException e) {
             fail("Unable to generate files" + e.getLocalizedMessage());
-        } 
+        }
     }
 }
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
index 714deac..df7600b 100644
--- a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
@@ -18,8 +18,23 @@
  */
 package org.apache.pulsar.io.flume.sink;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import com.google.common.collect.Maps;
-import org.apache.flume.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
@@ -28,21 +43,13 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.flume.AbstractFlumeTests;
+import org.junit.Assert;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.junit.Assert;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.mockito.Mockito.*;
 
 public class StringSinkTests extends AbstractFlumeTests {
 
@@ -55,13 +62,11 @@
 
     private AvroSource source;
     private Channel channel;
-    private InetAddress localhost;
 
     @BeforeMethod
     public void setUp() throws Exception {
         mockRecord = mock(Record.class);
         mockSinkContext = mock(SinkContext.class);
-        localhost = InetAddress.getByName("127.0.0.1");
         source = new AvroSource();
         channel = new MemoryChannel();
         Context context = new Context();
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
index 8a3ebda..e667cb2 100644
--- a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
@@ -20,6 +20,8 @@
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
+
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.flume.*;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
@@ -54,7 +56,7 @@
         }
         Context context = new Context();
         context.put("hostname", "127.0.0.1");
-        context.put("port", "44444");
+        context.put("port", "44445");
         context.put("batch-size", String.valueOf(2));
         context.put("connect-timeout", String.valueOf(2000L));
         context.put("request-timeout", String.valueOf(3000L));
@@ -70,9 +72,9 @@
     @AfterMethod
     public void tearDown() throws Exception {
         sink.stop();
+        sink = null;
     }
 
-
     @Test
     public void TestOpenAndReadSource() throws Exception {
         Map<String, Object> conf = Maps.newHashMap();
diff --git a/pulsar-io/flume/src/test/resources/flume/sink.conf b/pulsar-io/flume/src/test/resources/flume/sink.conf
index e45e6f4..d8febf2 100644
--- a/pulsar-io/flume/src/test/resources/flume/sink.conf
+++ b/pulsar-io/flume/src/test/resources/flume/sink.conf
@@ -27,7 +27,7 @@
 # Describe/configure the source
 a1.sources.r1.type = avro
 a1.sources.r1.bind = 127.0.0.1
-a1.sources.r1.port = 44444
+a1.sources.r1.port = 44445
 
 # Describe the sink
 a1.sinks.k1.type = org.apache.pulsar.io.flume.source.SinkOfFlume