SAMZA-2721: Container should exit with non-zero status code in case of errors during launch
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 7314a86..9cc8121 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,6 +19,7 @@
package org.apache.samza.runtime;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -99,10 +100,11 @@
run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config,
buildExternalContext(config));
- System.exit(0);
+ exitProcess(0);
}
- private static void run(
+ @VisibleForTesting
+ static void run(
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
String jobName,
String jobId,
@@ -112,9 +114,15 @@
JobModel jobModel,
Config config,
Optional<ExternalContext> externalContextOptional) {
- CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap());
+ CoordinatorStreamStore coordinatorStreamStore = buildCoordinatorStreamStore(config, new MetricsRegistryMap());
coordinatorStreamStore.init();
+ /*
+ * We track the exit code and only trigger exit in the finally block to make sure we are able to execute all the
+ * clean up steps. Prior implementation had short circuited exit causing some of the clean up steps to be missed.
+ */
+ int exitCode = 0;
+
try {
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
@@ -179,15 +187,39 @@
if (containerRunnerException != null) {
log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
- System.exit(1);
+ exitCode = 1;
}
} catch (Throwable e) {
- log.error("Container stopped with Exception. ", containerRunnerException);
+ /*
+ * Two separate log statements are intended to print the entire stack trace as part of the logs. Using
+ * single log statement with custom format requires explicitly fetching stack trace and null checks which makes
+ * the code slightly hard to read in comparison with the current choice.
+ */
+ log.error("Exiting the process due to", e);
+ log.error("Container runner exception: ", containerRunnerException);
+ exitCode = 1;
} finally {
coordinatorStreamStore.close();
+ /*
+ * Only exit in the scenario of non-zero exit code in order to maintain parity with current implementation where
+ * the method completes when no errors are encountered.
+ */
+ if (exitCode != 0) {
+ exitProcess(exitCode);
+ }
}
}
+ @VisibleForTesting
+ static CoordinatorStreamStore buildCoordinatorStreamStore(Config config, MetricsRegistryMap metricsRegistryMap) {
+ return new CoordinatorStreamStore(config, metricsRegistryMap);
+ }
+
+ @VisibleForTesting
+ static void exitProcess(int status) {
+ System.exit(status);
+ }
+
private static Optional<ExternalContext> buildExternalContext(Config config) {
/*
* By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
new file mode 100644
index 0000000..ec57991
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.job.model.JobModel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ContainerLaunchUtil.class)
+public class TestContainerLaunchUtil {
+ private static final String JOB_NAME = "test-job";
+ private static final String JOB_ID = "test-job-i001";
+ private static final String CONTAINER_ID = "test-job-container-0001";
+
+ private static final ApplicationDescriptorImpl APP_DESC = mock(ApplicationDescriptorImpl.class);
+ private static final JobModel JOB_MODEL = mock(JobModel.class);
+ private static final Config CONFIG = mock(Config.class);
+
+ @Test
+ public void testRunWithException() throws Exception {
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+ PowerMockito.mockStatic(ContainerLaunchUtil.class);
+ PowerMockito.doReturn(mock(CoordinatorStreamStore.class))
+ .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", eq(CONFIG), any());
+ PowerMockito.doAnswer(invocation -> {
+ completionLatch.countDown();
+ return null;
+ }).when(ContainerLaunchUtil.class, "exitProcess", eq(1));
+ PowerMockito.doCallRealMethod()
+ .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(),
+ eq(JOB_MODEL), eq(CONFIG), any());
+
+ ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL,
+ CONFIG, Optional.empty());
+ assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
+ }
+}