SAMZA-2721: Container should exit with non-zero status code in case of errors during launch
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 7314a86..9cc8121 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -99,10 +100,11 @@
     run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config,
         buildExternalContext(config));
 
-    System.exit(0);
+    exitProcess(0);
   }
 
-  private static void run(
+  @VisibleForTesting
+  static void run(
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
       String jobName,
       String jobId,
@@ -112,9 +114,15 @@
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
-    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap());
+    CoordinatorStreamStore coordinatorStreamStore = buildCoordinatorStreamStore(config, new MetricsRegistryMap());
     coordinatorStreamStore.init();
 
+    /*
+     * We track the exit code and only trigger exit in the finally block to make sure we are able to execute all the
+     * clean up steps. Prior implementation had short circuited exit causing some of the clean up steps to be missed.
+     */
+    int exitCode = 0;
+
     try {
       TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
       LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
@@ -179,15 +187,39 @@
 
       if (containerRunnerException != null) {
         log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
-        System.exit(1);
+        exitCode = 1;
       }
     } catch (Throwable e) {
-      log.error("Container stopped with Exception. ", containerRunnerException);
+      /*
+       * Two separate log statements are intended to print the entire stack trace as part of the logs. Using
+       * single log statement with custom format requires explicitly fetching stack trace and null checks which makes
+       * the code slightly hard to read in comparison with the current choice.
+       */
+      log.error("Exiting the process due to", e);
+      log.error("Container runner exception: ", containerRunnerException);
+      exitCode = 1;
     } finally {
       coordinatorStreamStore.close();
+      /*
+       * Only exit in the scenario of non-zero exit code in order to maintain parity with current implementation where
+       * the method completes when no errors are encountered.
+       */
+      if (exitCode != 0) {
+        exitProcess(exitCode);
+      }
     }
   }
 
+  @VisibleForTesting
+  static CoordinatorStreamStore buildCoordinatorStreamStore(Config config, MetricsRegistryMap metricsRegistryMap) {
+    return new CoordinatorStreamStore(config, metricsRegistryMap);
+  }
+
+  @VisibleForTesting
+  static void exitProcess(int status) {
+    System.exit(status);
+  }
+
   private static Optional<ExternalContext> buildExternalContext(Config config) {
     /*
      * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
new file mode 100644
index 0000000..ec57991
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.job.model.JobModel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ContainerLaunchUtil.class)
+public class TestContainerLaunchUtil {
+  private static final String JOB_NAME = "test-job";
+  private static final String JOB_ID = "test-job-i001";
+  private static final String CONTAINER_ID = "test-job-container-0001";
+
+  private static final ApplicationDescriptorImpl APP_DESC = mock(ApplicationDescriptorImpl.class);
+  private static final JobModel JOB_MODEL = mock(JobModel.class);
+  private static final Config CONFIG = mock(Config.class);
+
+  @Test
+  public void testRunWithException() throws Exception {
+    final CountDownLatch completionLatch = new CountDownLatch(1);
+    PowerMockito.mockStatic(ContainerLaunchUtil.class);
+    PowerMockito.doReturn(mock(CoordinatorStreamStore.class))
+        .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", eq(CONFIG), any());
+    PowerMockito.doAnswer(invocation -> {
+      completionLatch.countDown();
+      return null;
+    }).when(ContainerLaunchUtil.class, "exitProcess", eq(1));
+    PowerMockito.doCallRealMethod()
+        .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(),
+            eq(JOB_MODEL), eq(CONFIG), any());
+
+    ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL,
+        CONFIG, Optional.empty());
+    assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
+  }
+}