SAMZA-2544: Adding cleanup for Container Placement tests using threads, coordinator stream stores

**Improvement** [Bug fix]: Adding teardown / cleanup to Container placements integration tests

**API changes:** None

**Symptoms:**: Creates OOMs when test suite is executed
```
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RMI RenewClean-[]"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RMI RenewClean-[]"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "ContainerPlacement Request Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Container Allocator Thread"
```

**Upgrade Instructions:** None

**Usage Instructions:** None

Author: Sanil15 <sanil.jain15@gmail.com>

Reviewers: mynameborat <bharathkk@apache.org>

Closes #1378 from Sanil15/oom-fix
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 18e6cb4..87bf85e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -51,6 +51,7 @@
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -164,6 +165,13 @@
             clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager);
   }
 
+  @After
+  public void teardown() {
+    containerPlacementMetadataStore.stop();
+    cpm.stop();
+    coordinatorStreamStore.close();
+  }
+
   public void setupStandby() throws Exception {
     state = new SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0", "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
     callback = mock(ClusterResourceManager.Callback.class);
@@ -266,9 +274,10 @@
   @Test(timeout = 30000)
   public void testActionQueuingForConsecutivePlacementActions() throws Exception {
     // Spawn a Request Allocator Thread
-    Thread requestAllocatorThread = new Thread(
-        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config)),
-        "ContainerPlacement Request Allocator Thread");
+    ContainerPlacementRequestAllocator requestAllocator =
+        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+    Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
+
     requestAllocatorThread.start();
 
     doAnswer(new Answer<Void>() {
@@ -364,6 +373,9 @@
     // Requests from Previous deploy must be cleaned
     assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMoveBad).isPresent());
     assertFalse(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMoveBad).isPresent());
+
+    // Cleanup Request Allocator Thread
+    cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
   }
 
   @Test(timeout = 10000)
@@ -837,9 +849,9 @@
     setupStandby();
 
     // Spawn a Request Allocator Thread
-    Thread requestAllocatorThread = new Thread(
-        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config)),
-        "ContainerPlacement Request Allocator Thread");
+    ContainerPlacementRequestAllocator requestAllocator =
+        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+    Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
     requestAllocatorThread.start();
 
     doAnswer(new Answer<Void>() {
@@ -976,6 +988,9 @@
     // Request should be deleted as soon as ita accepted / being acted upon
     assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(standbyMoveRequest).isPresent());
     assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(activeMoveRequest).isPresent());
+
+    // Cleanup Request Allocator Thread
+    cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
   }
 
   private void assertResponseMessage(ContainerPlacementResponseMessage responseMessage,
@@ -1011,4 +1026,13 @@
     // Request shall be deleted as soon as it is acted upon
     assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
   }
+
+  private void cleanUpRequestAllocatorThread(ContainerPlacementRequestAllocator requestAllocator, Thread containerPlacementRequestAllocatorThread) {
+    requestAllocator.stop();
+    try {
+      containerPlacementRequestAllocatorThread.join();
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+  }
 }