Multiple unit tests improvements (#5439)
* Multiple unit tests improvements
* Fixed Storm integration tests
* Fixed StringSchema static initialization
* Peg number of forks to number of cores
* Updated to 4 forks
* Fixed resourcel leak in managed ledger tests
* Use different temp folders for PulsarFunctionState test
* Increase retries count for PulsarFunctionE2ESecurityTest
* Fixed Flume connector test
* Fixed race conditions in primitive schema types static initialization
* Improve port manager
* Removed PulsarFunctionStateTest to move to integration tests
* Fixed dangling class reference
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
index 5934151..8b23b84 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
@@ -25,7 +25,6 @@
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
@@ -46,50 +45,43 @@
public abstract class AbstractFileTests {
- public static final String TMP_DIR = "/tmp/foo";
-
protected BlockingQueue<File> workQueue;
protected BlockingQueue<File> inProcess;
protected BlockingQueue<File> recentlyProcessed;
protected BlockingQueue<File> producedFiles;
-
- protected TestFileGenerator generatorThread;
+
+ protected TestFileGenerator generatorThread;
protected FileListingThread listingThread;
protected ExecutorService executor;
-
+
+ protected Path directory;
+
@BeforeMethod
public void init() throws IOException {
-
// Create the directory we are going to read from
- Path directory = Paths.get(TMP_DIR);
-
- if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
- Files.createDirectory(directory, getPermissions());
- }
-
+ directory = Files.createTempDirectory("pulsar-io-file-tests", getPermissions());
+
workQueue = Mockito.spy(new LinkedBlockingQueue<>());
- inProcess = Mockito.spy(new LinkedBlockingQueue<>());
+ inProcess = Mockito.spy(new LinkedBlockingQueue<>());
recentlyProcessed = Mockito.spy(new LinkedBlockingQueue<>());
producedFiles = Mockito.spy(new LinkedBlockingQueue<>());
executor = Executors.newFixedThreadPool(10);
}
-
+
@AfterMethod
public void tearDown() throws Exception {
// Shutdown all of the processing threads
stopThreads();
-
+
// Delete the directory and all the files
cleanUp();
}
-
- protected static final void cleanUp() throws IOException {
- Path directory = Paths.get(TMP_DIR);
-
+
+ protected final void cleanUp() throws IOException {
if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
return;
}
-
+
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
@@ -104,35 +96,35 @@
}
});
}
-
+
protected void stopThreads() throws Exception {
executor.shutdown();
try {
if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
- }
+ }
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
-
+
protected final void generateFiles(int numFiles) throws IOException, InterruptedException, ExecutionException {
- generateFiles(numFiles, 1, TMP_DIR);
+ generateFiles(numFiles, 1, directory.toString());
}
-
+
protected final void generateFiles(int numFiles, int numLines) throws IOException, InterruptedException, ExecutionException {
- generateFiles(numFiles, numLines, TMP_DIR);
+ generateFiles(numFiles, numLines, directory.toString());
}
-
+
protected final void generateFiles(int numFiles, int numLines, String directory) throws IOException, InterruptedException, ExecutionException {
generatorThread = new TestFileGenerator(producedFiles, numFiles, 1, numLines, directory, "prefix", ".txt", getPermissions());
Future<?> f = executor.submit(generatorThread);
f.get();
}
-
+
protected static final FileAttribute<Set<PosixFilePermission>> getPermissions() {
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx");
return PosixFilePermissions.asFileAttribute(perms);
}
-
+
}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
index a4582bc..30284a5 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -37,19 +37,19 @@
@SuppressWarnings("unchecked")
public class FileConsumerThreadTests extends AbstractFileTests {
-
+
private PushSource<byte[]> consumer;
private FileConsumerThread consumerThread;
@Test
public final void singleFileTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
-
+ map.put("inputDirectory", directory.toString());
+
try {
generateFiles(1);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -57,14 +57,14 @@
executor.execute(listingThread);
executor.execute(consumerThread);
Thread.sleep(2000);
-
+
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
verify(inProcess, times(1)).add(produced);
verify(inProcess, times(1)).remove(produced);
verify(recentlyProcessed, times(1)).add(produced);
}
-
+
verify(workQueue, times(1)).offer(any(File.class));
verify(workQueue, atLeast(1)).take();
verify(inProcess, times(1)).add(any(File.class));
@@ -73,18 +73,18 @@
verify(consumer, times(1)).consume((Record<byte[]>) any(Record.class));
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void mulitpleFileTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
-
+ map.put("inputDirectory", directory.toString());
+
try {
generateFiles(50, 2);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -92,14 +92,14 @@
executor.execute(listingThread);
executor.execute(consumerThread);
Thread.sleep(2000);
-
+
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
verify(inProcess, times(1)).add(produced);
verify(inProcess, times(1)).remove(produced);
verify(recentlyProcessed, times(1)).add(produced);
}
-
+
verify(workQueue, times(50)).offer(any(File.class));
verify(workQueue, atLeast(50)).take();
verify(inProcess, times(50)).add(any(File.class));
@@ -108,18 +108,18 @@
verify(consumer, times(100)).consume((Record<byte[]>) any(Record.class));
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void multiLineFileTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
-
+ map.put("inputDirectory", directory.toString());
+
try {
generateFiles(1, 10);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -127,14 +127,14 @@
executor.execute(listingThread);
executor.execute(consumerThread);
Thread.sleep(2000);
-
+
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
verify(inProcess, times(1)).add(produced);
verify(inProcess, times(1)).remove(produced);
verify(recentlyProcessed, times(1)).add(produced);
}
-
+
verify(workQueue, times(1)).offer(any(File.class));
verify(workQueue, atLeast(1)).take();
verify(inProcess, times(1)).add(any(File.class));
@@ -143,6 +143,6 @@
verify(consumer, times(10)).consume((Record<byte[]>) any(Record.class));
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
index 5be101f..01e5143 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -33,13 +33,13 @@
public class FileListingThreadTests extends AbstractFileTests {
-
+
@Test
- public final void singleFileTest() throws IOException {
-
+ public final void singleFileTest() throws IOException {
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
-
+ map.put("inputDirectory", directory.toString());
+
try {
generateFiles(1);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -47,44 +47,44 @@
Thread.sleep(2000);
verify(producedFiles, times(1)).put(any(File.class));
verify(workQueue, times(1)).offer(any(File.class));
-
+
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
}
-
+
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void fiftyFileTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
-
+ map.put("inputDirectory", directory.toString());
+
try {
generateFiles(50);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
Thread.sleep(2000);
verify(workQueue, times(50)).offer(any(File.class));
-
+
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
}
-
+
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void minimumSizeTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
-
+ map.put("inputDirectory", directory.toString());
+
try {
// Create 50 zero size files
generateFiles(50, 0);
@@ -94,16 +94,16 @@
verify(workQueue, times(0)).offer(any(File.class));
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void maximumSizeTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("maximumSize", "1000");
-
+
try {
// Create 5 files that exceed the limit and 45 that don't
generateFiles(5, 1000);
@@ -118,14 +118,14 @@
cleanUp();
}
}
-
+
@Test
public final void minimumAgeTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("minimumFileAge", "5000");
-
+
try {
// Create 5 files that will be too "new" for processing
generateFiles(5);
@@ -139,19 +139,19 @@
cleanUp();
}
}
-
+
@Test
public final void maximumAgeTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("maximumFileAge", "5000");
-
+
try {
// Create 5 files that will be processed
generateFiles(5);
Thread.sleep(5000);
-
+
// Create 5 files that will be too "old" for processing
generateFiles(5);
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
@@ -164,20 +164,20 @@
cleanUp();
}
}
-
+
@Test
public final void doRecurseTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("recurse", Boolean.TRUE);
-
+
try {
// Create 5 files in the root folder
generateFiles(5);
-
+
// Create 5 files in a sub-folder
- generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+ generateFiles(5, 1, directory.toString() + File.separator + "sub-dir");
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
Thread.sleep(2000);
@@ -186,22 +186,22 @@
fail("Unable to generate files" + e.getLocalizedMessage());
} finally {
cleanUp();
- }
+ }
}
-
+
@Test
public final void doNotRecurseTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("recurse", Boolean.FALSE);
-
+
try {
// Create 5 files in the root folder
generateFiles(5);
-
+
// Create 5 files in a sub-folder
- generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+ generateFiles(5, 1, directory.toString() + File.separator + "sub-dir");
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
Thread.sleep(2000);
@@ -210,21 +210,21 @@
fail("Unable to generate files" + e.getLocalizedMessage());
} finally {
cleanUp();
- }
+ }
}
-
+
@Test
public final void pathFilterTest() throws IOException {
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("recurse", Boolean.TRUE);
map.put("pathFilter", "sub-.*");
-
+
try {
// Create 5 files in a sub-folder
- generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir-a");
- generateFiles(5, 1, TMP_DIR + File.separator + "dir-b");
+ generateFiles(5, 1, directory.toString() + File.separator + "sub-dir-a");
+ generateFiles(5, 1, directory.toString() + File.separator + "dir-b");
listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed);
executor.execute(listingThread);
Thread.sleep(2000);
@@ -233,6 +233,6 @@
fail("Unable to generate files" + e.getLocalizedMessage());
} finally {
cleanUp();
- }
+ }
}
}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
index 9970a55..f37737c 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -45,14 +45,14 @@
@Test
public final void singleFileTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("keepFile", Boolean.FALSE);
-
+
try {
generateFiles(1);
fileConfig = FileSourceConfig.load(map);
@@ -63,35 +63,35 @@
executor.execute(consumerThread);
executor.execute(cleanupThread);
Thread.sleep(2000);
-
+
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
verify(inProcess, times(1)).add(produced);
verify(inProcess, times(1)).remove(produced);
verify(recentlyProcessed, times(1)).add(produced);
}
-
+
verify(workQueue, times(1)).offer(any(File.class));
verify(workQueue, atLeast(1)).take();
verify(inProcess, times(1)).add(any(File.class));
verify(inProcess, times(1)).remove(any(File.class));
verify(recentlyProcessed, times(1)).add(any(File.class));
- verify(recentlyProcessed, times(2)).take();
+ verify(recentlyProcessed, times(2)).take();
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void mulitpleFileTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("keepFile", Boolean.FALSE);
-
+
try {
generateFiles(50);
fileConfig = FileSourceConfig.load(map);
@@ -102,37 +102,37 @@
executor.execute(consumerThread);
executor.execute(cleanupThread);
Thread.sleep(2000);
-
+
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
verify(inProcess, times(1)).add(produced);
verify(inProcess, times(1)).remove(produced);
verify(recentlyProcessed, times(1)).add(produced);
}
-
+
verify(workQueue, times(50)).offer(any(File.class));
verify(workQueue, atLeast(50)).take();
verify(inProcess, times(50)).add(any(File.class));
verify(inProcess, times(50)).remove(any(File.class));
verify(recentlyProcessed, times(50)).add(any(File.class));
verify(recentlyProcessed, times(50)).add(any(File.class));
- verify(recentlyProcessed, times(51)).take();
+ verify(recentlyProcessed, times(51)).take();
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void keepFileTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("keepFile", Boolean.TRUE);
map.put("pollingInterval", 1000L);
-
+
try {
generateFiles(1);
fileConfig = FileSourceConfig.load(map);
@@ -143,55 +143,55 @@
executor.execute(consumerThread);
executor.execute(cleanupThread);
Thread.sleep(7900); // Should pull the same file 5 times?
-
+
for (File produced : producedFiles) {
verify(workQueue, atLeast(4)).offer(produced);
verify(inProcess, atLeast(4)).add(produced);
verify(inProcess, atLeast(4)).remove(produced);
verify(recentlyProcessed, atLeast(4)).add(produced);
}
-
- verify(recentlyProcessed, atLeast(5)).take();
+
+ verify(recentlyProcessed, atLeast(5)).take();
} catch (InterruptedException | ExecutionException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void continuousRunTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("keepFile", Boolean.FALSE);
map.put("pollingInterval", 100);
fileConfig = FileSourceConfig.load(map);
-
+
try {
// Start producing files, with a .1 sec delay between
- generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions());
+ generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, directory.toString(), "continuous", ".txt", getPermissions());
executor.execute(generatorThread);
-
+
listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed);
executor.execute(listingThread);
executor.execute(consumerThread);
executor.execute(cleanupThread);
-
+
// Run for 30 seconds
Thread.sleep(30000);
-
+
// Stop producing files
generatorThread.halt();
-
+
// Let the consumer catch up
while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) {
Thread.sleep(2000);
}
-
+
// Make sure every single file was processed.
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
@@ -199,29 +199,29 @@
verify(inProcess, times(1)).remove(produced);
verify(recentlyProcessed, times(1)).add(produced);
}
-
+
} catch (InterruptedException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
-
+
@Test
public final void multipleConsumerTest() throws IOException {
-
+
consumer = Mockito.mock(PushSource.class);
Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class));
-
+
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", TMP_DIR);
+ map.put("inputDirectory", directory.toString());
map.put("keepFile", Boolean.FALSE);
map.put("pollingInterval", 100);
fileConfig = FileSourceConfig.load(map);
-
+
try {
// Start producing files, with a .1 sec delay between
- generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions());
+ generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, directory.toString(), "continuous", ".txt", getPermissions());
executor.execute(generatorThread);
-
+
listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed);
consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
FileConsumerThread consumerThread2 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
@@ -232,18 +232,18 @@
executor.execute(consumerThread2);
executor.execute(consumerThread3);
executor.execute(cleanupThread);
-
+
// Run for 30 seconds
Thread.sleep(30000);
-
+
// Stop producing files
generatorThread.halt();
-
+
// Let the consumer catch up
while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) {
Thread.sleep(2000);
}
-
+
// Make sure every single file was processed exactly once.
for (File produced : producedFiles) {
verify(workQueue, times(1)).offer(produced);
@@ -251,9 +251,9 @@
verify(inProcess, times(1)).remove(produced);
verify(recentlyProcessed, times(1)).add(produced);
}
-
+
} catch (InterruptedException e) {
fail("Unable to generate files" + e.getLocalizedMessage());
- }
+ }
}
}
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
index 714deac..df7600b 100644
--- a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/sink/StringSinkTests.java
@@ -18,8 +18,23 @@
*/
package org.apache.pulsar.io.flume.sink;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import com.google.common.collect.Maps;
-import org.apache.flume.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
@@ -28,21 +43,13 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.flume.AbstractFlumeTests;
+import org.junit.Assert;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import org.junit.Assert;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.mockito.Mockito.*;
public class StringSinkTests extends AbstractFlumeTests {
@@ -55,13 +62,11 @@
private AvroSource source;
private Channel channel;
- private InetAddress localhost;
@BeforeMethod
public void setUp() throws Exception {
mockRecord = mock(Record.class);
mockSinkContext = mock(SinkContext.class);
- localhost = InetAddress.getByName("127.0.0.1");
source = new AvroSource();
channel = new MemoryChannel();
Context context = new Context();
diff --git a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
index 8a3ebda..e667cb2 100644
--- a/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
+++ b/pulsar-io/flume/src/test/java/org/apache/pulsar/io/flume/source/StringSourceTests.java
@@ -20,6 +20,8 @@
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
+
+import org.apache.bookkeeper.test.PortManager;
import org.apache.flume.*;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
@@ -54,7 +56,7 @@
}
Context context = new Context();
context.put("hostname", "127.0.0.1");
- context.put("port", "44444");
+ context.put("port", "44445");
context.put("batch-size", String.valueOf(2));
context.put("connect-timeout", String.valueOf(2000L));
context.put("request-timeout", String.valueOf(3000L));
@@ -70,9 +72,9 @@
@AfterMethod
public void tearDown() throws Exception {
sink.stop();
+ sink = null;
}
-
@Test
public void TestOpenAndReadSource() throws Exception {
Map<String, Object> conf = Maps.newHashMap();
diff --git a/pulsar-io/flume/src/test/resources/flume/sink.conf b/pulsar-io/flume/src/test/resources/flume/sink.conf
index e45e6f4..d8febf2 100644
--- a/pulsar-io/flume/src/test/resources/flume/sink.conf
+++ b/pulsar-io/flume/src/test/resources/flume/sink.conf
@@ -27,7 +27,7 @@
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 127.0.0.1
-a1.sources.r1.port = 44444
+a1.sources.r1.port = 44445
# Describe the sink
a1.sinks.k1.type = org.apache.pulsar.io.flume.source.SinkOfFlume