APEXCORE-734 StramLocalCluster may not terminate properly
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index cfbd047..41e358e 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -463,103 +463,126 @@
@SuppressWarnings({"SleepWhileInLoop", "ResultOfObjectAllocationIgnored"})
public void run(long runMillis)
{
- if (!perContainerBufferServer) {
- StreamingContainer.eventloop.start();
- bufferServer = new Server(StreamingContainer.eventloop, 0, 1024 * 1024, 8);
- try {
- bufferServer.setSpoolStorage(new DiskStorage());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run().getPort());
- LOG.info("Buffer server started: {}", bufferServerAddress);
- }
-
- long endMillis = System.currentTimeMillis() + runMillis;
+ Thread eventLoopThread = null;
List<Thread> containerThreads = new LinkedList<>();
-
- while (!appDone) {
-
- for (String containerIdStr: dnmgr.containerStopRequests.values()) {
- // teardown child thread
- StreamingContainer c = childContainers.get(containerIdStr);
- if (c != null) {
- ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();
- r.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT;
- c.processHeartbeatResponse(r);
+ try {
+ if (!perContainerBufferServer) {
+ eventLoopThread = StreamingContainer.eventloop.start();
+ bufferServer = new Server(StreamingContainer.eventloop, 0, 1024 * 1024, 8);
+ try {
+ bufferServer.setSpoolStorage(new DiskStorage());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- dnmgr.containerStopRequests.remove(containerIdStr);
- LOG.info("Container {} restart.", containerIdStr);
- dnmgr.scheduleContainerRestart(containerIdStr);
- //dnmgr.removeContainerAgent(containerIdStr);
+ bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run().getPort());
+ LOG.info("Buffer server started: {}", bufferServerAddress);
}
- // start containers
- while (!dnmgr.containerStartRequests.isEmpty()) {
- ContainerStartRequest cdr = dnmgr.containerStartRequests.poll();
- if (cdr != null) {
- new LocalStreamingContainerLauncher(cdr, containerThreads);
+ long endMillis = System.currentTimeMillis() + runMillis;
+
+ while (!appDone) {
+
+ for (String containerIdStr : dnmgr.containerStopRequests.values()) {
+ // teardown child thread
+ StreamingContainer c = childContainers.get(containerIdStr);
+ if (c != null) {
+ ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();
+ r.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT;
+ c.processHeartbeatResponse(r);
+ }
+ dnmgr.containerStopRequests.remove(containerIdStr);
+ LOG.info("Container {} restart.", containerIdStr);
+ dnmgr.scheduleContainerRestart(containerIdStr);
+ //dnmgr.removeContainerAgent(containerIdStr);
}
- }
- if (heartbeatMonitoringEnabled) {
- // monitor child containers
- dnmgr.monitorHeartbeat(false);
- }
+ // start containers
+ while (!dnmgr.containerStartRequests.isEmpty()) {
+ ContainerStartRequest cdr = dnmgr.containerStartRequests.poll();
+ if (cdr != null) {
+ new LocalStreamingContainerLauncher(cdr, containerThreads);
+ }
+ }
- if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) {
- appDone = true;
- }
+ if (heartbeatMonitoringEnabled) {
+ // monitor child containers
+ dnmgr.monitorHeartbeat(false);
+ }
- if (runMillis > 0 && System.currentTimeMillis() > endMillis) {
- appDone = true;
- }
-
- try {
- if (exitCondition != null && exitCondition.call()) {
+ if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) {
appDone = true;
}
- } catch (Exception ex) {
- break;
- }
- if (Thread.interrupted()) {
- break;
- }
+ if (runMillis > 0 && System.currentTimeMillis() > endMillis) {
+ appDone = true;
+ }
- if (!appDone) {
try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.debug("Sleep interrupted", e);
+ if (exitCondition != null && exitCondition.call()) {
+ appDone = true;
+ }
+ } catch (Exception ex) {
break;
}
+
+ if (Thread.interrupted()) {
+ break;
+ }
+
+ if (!appDone) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.debug("Sleep interrupted", e);
+ break;
+ }
+ }
+ }
+ } finally {
+ for (LocalStreamingContainer lsc : childContainers.values()) {
+ injectShutdown.put(lsc.getContainerId(), lsc);
+ lsc.triggerHeartbeat();
+ }
+
+ for (Thread thread : containerThreads) {
+ try {
+ thread.join(1000);
+ } catch (InterruptedException e) {
+ LOG.debug("Wait for {} to terminate interrupted", thread, e);
+ }
+ if (thread.isAlive()) {
+ LOG.warn("Container thread {} is still alive", thread.getName());
+ }
+ }
+
+ try {
+ dnmgr.teardown();
+ } catch (RuntimeException e) {
+ LOG.warn("Exception during StreamingContainerManager teardown", e);
+ }
+
+ if (bufferServerAddress != null) {
+ try {
+ bufferServer.stop();
+ } catch (RuntimeException e) {
+ LOG.warn("Exception during BufferServer stop", e);
+ }
+ }
+
+ if (eventLoopThread != null) {
+ try {
+ StreamingContainer.eventloop.stop();
+ eventLoopThread.join(1000);
+ } catch (InterruptedException ie) {
+ LOG.debug("Wait for {} to terminate interrupted", eventLoopThread.getName(), ie);
+ } catch (RuntimeException e) {
+ LOG.warn("Exception during {} stop", StreamingContainer.eventloop, e);
+ }
+ if (StreamingContainer.eventloop.isActive()) {
+ LOG.warn("Event loop {} is still active", StreamingContainer.eventloop);
+ }
}
}
-
- for (LocalStreamingContainer lsc: childContainers.values()) {
- injectShutdown.put(lsc.getContainerId(), lsc);
- lsc.triggerHeartbeat();
- }
-
- for (Thread thread: containerThreads) {
- try {
- thread.join(1000);
- } catch (InterruptedException e) {
- LOG.debug("Sleep interrupted", e);
- }
- if (thread.isAlive()) {
- LOG.warn("Container thread {} didn't finish", thread.getName());
- }
- }
-
- dnmgr.teardown();
-
LOG.info("Application finished.");
- if (!perContainerBufferServer) {
- bufferServer.stop();
- StreamingContainer.eventloop.stop();
- }
}
-
}