SAMZA-2544: Adding cleanup for Container Placement tests using threads, coordinator stream stores
**Improvement** [Bug fix]: Adding teardown / cleanup to Container placements integration tests
**API changes:** None
**Symptoms:**: Creates OOMs when test suite is executed
```
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RMI RenewClean-[]"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RMI RenewClean-[]"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "ContainerPlacement Request Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
```
**Upgrade Instructions:** None
**Usage Instructions:** None
Author: Sanil15 <sanil.jain15@gmail.com>
Reviewers: mynameborat <bharathkk@apache.org>
Closes #1378 from Sanil15/oom-fix
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 18e6cb4..87bf85e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -51,6 +51,7 @@
import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -164,6 +165,13 @@
clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
}
+ @After
+ public void teardown() {
+ containerPlacementMetadataStore.stop();
+ cpm.stop();
+ coordinatorStreamStore.close();
+ }
+
public void setupStandby() throws Exception {
state = new SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0", "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
callback = mock(ClusterResourceManager.Callback.class);
@@ -266,9 +274,10 @@
@Test(timeout = 30000)
public void testActionQueuingForConsecutivePlacementActions() throws Exception {
// Spawn a Request Allocator Thread
- Thread requestAllocatorThread = new Thread(
- new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config)),
- "ContainerPlacement Request Allocator Thread");
+ ContainerPlacementRequestAllocator requestAllocator =
+ new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+ Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
+
requestAllocatorThread.start();
doAnswer(new Answer<Void>() {
@@ -364,6 +373,9 @@
// Requests from Previous deploy must be cleaned
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMoveBad).isPresent());
assertFalse(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMoveBad).isPresent());
+
+ // Cleanup Request Allocator Thread
+ cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
}
@Test(timeout = 10000)
@@ -837,9 +849,9 @@
setupStandby();
// Spawn a Request Allocator Thread
- Thread requestAllocatorThread = new Thread(
- new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config)),
- "ContainerPlacement Request Allocator Thread");
+ ContainerPlacementRequestAllocator requestAllocator =
+ new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+ Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
requestAllocatorThread.start();
doAnswer(new Answer<Void>() {
@@ -976,6 +988,9 @@
// Request should be deleted as soon as ita accepted / being acted upon
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(standbyMoveRequest).isPresent());
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(activeMoveRequest).isPresent());
+
+ // Cleanup Request Allocator Thread
+ cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
}
private void assertResponseMessage(ContainerPlacementResponseMessage responseMessage,
@@ -1011,4 +1026,13 @@
// Request shall be deleted as soon as it is acted upon
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
}
+
+ private void cleanUpRequestAllocatorThread(ContainerPlacementRequestAllocator requestAllocator, Thread containerPlacementRequestAllocatorThread) {
+ requestAllocator.stop();
+ try {
+ containerPlacementRequestAllocatorThread.join();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
}