SAMZA-2584: Refactor ClusterBasedJobCoordinator (#1424)
Issues: In the deployment flow of a beam job, we will have a complicate flow: ClusterBasedJobCoordinator#main -> Beam main class -> JobCoordinatorLaunchUtil -> ClusterBasedJobCoordinator.
Changes:
1. Move ClusterBasedJobCoordinator#main to ClusterBasedJobCoordinatorRunner#main
2. Update run-jc.sh to invoke ClusterBasedJobCoordinatorRunner
Tests:
1. unit tests
2. Deployed hello samza job successfully with the change following instructions on http://samza.apache.org/startup/hello-samza/latest/
API Changes: None
Upgrade Instructions: None
Usage Instructions: None
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 68e2f77..6295ed6 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,29 +19,19 @@
package org.apache.samza.clustermanager;
import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
-import org.apache.samza.application.ApplicationUtil;
-import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.LocalityManager;
@@ -52,7 +42,6 @@
import org.apache.samza.coordinator.PartitionChangeException;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.StreamRegexMonitor;
-import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
@@ -63,16 +52,12 @@
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.JmxServer;
import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.ConfigUtil;
-import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.DiagnosticsUtil;
-import org.apache.samza.util.SplitDeploymentUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -465,142 +450,4 @@
ContainerProcessManager createContainerProcessManager() {
return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
}
-
- /**
- * The entry point for the {@link ClusterBasedJobCoordinator}.
- */
- public static void main(String[] args) {
- Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
- LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
- System.exit(1);
- });
- if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
- // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
- runClusterBasedJobCoordinator(args);
- } else {
- SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
- ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
- }
- System.exit(0);
- }
-
- /**
- * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
- * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
- */
- private static void runClusterBasedJobCoordinator(String[] args) {
- final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG);
- final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG);
-
- if (!StringUtils.isBlank(submissionEnv)) {
- Config submissionConfig;
- try {
- //Read and parse the coordinator system config.
- LOG.info("Parsing submission config {}", submissionEnv);
- submissionConfig =
- new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class));
- LOG.info("Using the submission config: {}.", submissionConfig);
- } catch (IOException e) {
- LOG.error("Exception while reading submission config", e);
- throw new SamzaException(e);
- }
-
- ApplicationConfig appConfig = new ApplicationConfig(submissionConfig);
-
- /*
- * Invoke app.main.class with app.main.args when present.
- * For Beam jobs, app.main.class will be Beam's main class
- * and app.main.args will be Beam's pipeline options.
- */
- if (appConfig.getAppMainClass().isPresent()) {
- String className = appConfig.getAppMainClass().get();
- LOG.info("Invoke main {}", className);
- try {
- Class<?> cls = Class.forName(className);
- Method mainMethod = cls.getMethod("main", String[].class);
- mainMethod.invoke(null, (Object) toArgs(appConfig));
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- } else {
- JobConfig jobConfig = new JobConfig(submissionConfig);
-
- if (!jobConfig.getConfigLoaderFactory().isPresent()) {
- throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
- }
-
- // load full job config with ConfigLoader
- Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
-
- JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig);
- }
-
- LOG.info("Finished running ClusterBasedJobCoordinator");
- } else {
- // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
- Config coordinatorSystemConfig;
- try {
- //Read and parse the coordinator system config.
- LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
- coordinatorSystemConfig =
- new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
- LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig);
- } catch (IOException e) {
- LOG.error("Exception while reading coordinator stream config", e);
- throw new SamzaException(e);
- }
- ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig);
- jc.run();
- LOG.info("Finished running ClusterBasedJobCoordinator");
- }
- }
-
- /**
- * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from
- * coordinator stream.
- *
- * @param metadataStoreConfig to initialize {@link MetadataStore}
- * @return {@link ClusterBasedJobCoordinator}
- */
- // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
- @VisibleForTesting
- static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) {
- MetricsRegistryMap metrics = new MetricsRegistryMap();
-
- CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics);
- coordinatorStreamStore.init();
- Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
-
- return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config);
- }
-
- /**
- * Convert Samza config to command line arguments to invoke app.main.class
- *
- * @param config Samza config to convert.
- * @return converted command line arguments.
- */
- @VisibleForTesting
- static String[] toArgs(ApplicationConfig config) {
- List<String> args = new ArrayList<>(config.size() * 2);
-
- config.forEach((key, value) -> {
- if (key.equals(ApplicationConfig.APP_MAIN_ARGS)) {
- /*
- * Converts native beam pipeline options such as
- * --runner=SamzaRunner --maxSourceParallelism=1024
- */
- args.addAll(Arrays.asList(value.split("\\s")));
- } else {
- /*
- * Converts native Samza configs to config override format such as
- * --config job.name=test
- */
- args.add("--config");
- args.add(String.format("%s=%s", key, value));
- }
- });
-
- return args.toArray(new String[0]);
- }
}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
new file mode 100644
index 0000000..107c473
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.classloader.IsolatingClassLoaderFactory;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.SplitDeploymentUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ClusterBasedJobCoordinatorRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterBasedJobCoordinatorRunner.class);
+
+ /**
+ * The entry point for the {@link ClusterBasedJobCoordinator}.
+ */
+ public static void main(String[] args) {
+ Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
+ LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
+ System.exit(1);
+ });
+ if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
+ // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
+ runClusterBasedJobCoordinator(args);
+ } else {
+ SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
+ ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
+ }
+ System.exit(0);
+ }
+
+ /**
+ * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
+ * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+ */
+ @VisibleForTesting
+ static void runClusterBasedJobCoordinator(String[] args) {
+ final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG);
+ final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG);
+
+ if (!StringUtils.isBlank(submissionEnv)) {
+ Config submissionConfig;
+ try {
+ //Read and parse the coordinator system config.
+ LOG.info("Parsing submission config {}", submissionEnv);
+ submissionConfig =
+ new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class));
+ LOG.info("Using the submission config: {}.", submissionConfig);
+ } catch (IOException e) {
+ LOG.error("Exception while reading submission config", e);
+ throw new SamzaException(e);
+ }
+
+ ApplicationConfig appConfig = new ApplicationConfig(submissionConfig);
+
+ /*
+ * Invoke app.main.class with app.main.args when present.
+ * For Beam jobs, app.main.class will be Beam's main class
+ * and app.main.args will be Beam's pipeline options.
+ */
+ if (appConfig.getAppMainClass().isPresent()) {
+ String className = appConfig.getAppMainClass().get();
+ LOG.info("Invoke main {}", className);
+ try {
+ Class<?> cls = Class.forName(className);
+ Method mainMethod = cls.getMethod("main", String[].class);
+ mainMethod.invoke(null, (Object) toArgs(appConfig));
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ } else {
+ JobConfig jobConfig = new JobConfig(submissionConfig);
+
+ if (!jobConfig.getConfigLoaderFactory().isPresent()) {
+ throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
+ }
+
+ // load full job config with ConfigLoader
+ Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
+
+ JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig);
+ }
+
+ LOG.info("Finished running ClusterBasedJobCoordinator");
+ } else {
+ // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
+ Config coordinatorSystemConfig;
+ try {
+ //Read and parse the coordinator system config.
+ LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+ coordinatorSystemConfig =
+ new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
+ LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig);
+ } catch (IOException e) {
+ LOG.error("Exception while reading coordinator stream config", e);
+ throw new SamzaException(e);
+ }
+ ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig);
+ jc.run();
+ LOG.info("Finished running ClusterBasedJobCoordinator");
+ }
+ }
+
+ /**
+ * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from
+ * coordinator stream.
+ *
+ * @param metadataStoreConfig to initialize {@link org.apache.samza.metadatastore.MetadataStore}
+ * @return {@link ClusterBasedJobCoordinator}
+ */
+ // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
+ @VisibleForTesting
+ static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) {
+ MetricsRegistryMap metrics = new MetricsRegistryMap();
+
+ CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics);
+ coordinatorStreamStore.init();
+ Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+
+ return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config);
+ }
+
+ /**
+ * Convert Samza config to command line arguments to invoke app.main.class
+ *
+ * @param config Samza config to convert.
+ * @return converted command line arguments.
+ */
+ @VisibleForTesting
+ static String[] toArgs(ApplicationConfig config) {
+ List<String> args = new ArrayList<>(config.size() * 2);
+
+ config.forEach((key, value) -> {
+ if (key.equals(ApplicationConfig.APP_MAIN_ARGS)) {
+ /*
+ * Converts native beam pipeline options such as
+ * --runner=SamzaRunner --maxSourceParallelism=1024
+ */
+ args.addAll(Arrays.asList(value.split("\\s")));
+ } else {
+ /*
+ * Converts native Samza configs to config override format such as
+ * --config job.name=test
+ */
+ args.add("--config");
+ args.add(String.format("%s=%s", key, value));
+ }
+ });
+
+ return args.toArray(new String[0]);
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 6444451..caa1ffe 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -69,7 +69,7 @@
@RunWith(PowerMockRunner.class)
@PrepareForTest({
CoordinatorStreamUtil.class,
- ClusterBasedJobCoordinator.class,
+ ClusterBasedJobCoordinatorRunner.class,
CoordinatorStreamStore.class,
RemoteJobPlanner.class})
public class TestClusterBasedJobCoordinator {
@@ -112,7 +112,7 @@
CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
producer.writeConfig("test-job", config);
- ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config);
+ ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config);
// change the input system stream metadata
MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
@@ -132,7 +132,7 @@
CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
producer.writeConfig("test-job", config);
- ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config);
+ ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config);
// change the input system stream metadata
MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
@@ -150,7 +150,7 @@
Config config = new MapConfig(configMap);
MockitoException stopException = new MockitoException("Stop");
- ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(ClusterBasedJobCoordinator.createFromMetadataStore(config));
+ ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config));
ContainerProcessManager mockContainerProcessManager = mock(ContainerProcessManager.class);
doReturn(true).when(mockContainerProcessManager).shouldShutdown();
StartpointManager mockStartpointManager = mock(StartpointManager.class);
@@ -186,7 +186,7 @@
"--config", "app.class=class1",
"--runner=SamzaRunner",
"--maxSourceParallelism=1024");
- List<String> actual = Arrays.asList(ClusterBasedJobCoordinator.toArgs(appConfig));
+ List<String> actual = Arrays.asList(ClusterBasedJobCoordinatorRunner.toArgs(appConfig));
// cannot assert expected equals to actual as the order can be different.
assertEquals(expected.size(), actual.size());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinatorRunner.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinatorRunner.java
new file mode 100644
index 0000000..548cb4a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinatorRunner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.util.ConfigUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ System.class,
+ ClusterBasedJobCoordinatorRunner.class,
+ ApplicationUtil.class,
+ JobCoordinatorLaunchUtil.class})
+public class TestClusterBasedJobCoordinatorRunner {
+
+ @Test
+ public void testRunClusterBasedJobCoordinator() throws Exception {
+ Config submissionConfig = new MapConfig(ImmutableMap.of(
+ JobConfig.CONFIG_LOADER_FACTORY,
+ PropertiesConfigLoaderFactory.class.getName(),
+ PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
+ getClass().getResource("/test.properties").getPath()));
+ Config fullConfig = ConfigUtil.loadConfig(submissionConfig);
+ StreamApplication mockApplication = mock(StreamApplication.class);
+ PowerMockito.mockStatic(System.class, ApplicationUtil.class, JobCoordinatorLaunchUtil.class);
+ PowerMockito
+ .when(System.getenv(eq(ShellCommandConfig.ENV_SUBMISSION_CONFIG)))
+ .thenReturn(SamzaObjectMapper.getObjectMapper().writeValueAsString(submissionConfig));
+ PowerMockito
+ .when(ApplicationUtil.fromConfig(any()))
+ .thenReturn(mockApplication);
+ PowerMockito.doNothing().when(JobCoordinatorLaunchUtil.class, "run", mockApplication, fullConfig);
+
+ ClusterBasedJobCoordinatorRunner.runClusterBasedJobCoordinator(null);
+
+ PowerMockito.verifyStatic(times(1));
+ JobCoordinatorLaunchUtil.run(mockApplication, fullConfig);
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
index d1dd8f8..72772ba 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
@@ -18,7 +18,7 @@
*/
package org.apache.samza.util;
-import org.apache.samza.clustermanager.ClusterBasedJobCoordinator;
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
@@ -33,18 +33,18 @@
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ClusterBasedJobCoordinator.class})
+@PrepareForTest({ClusterBasedJobCoordinatorRunner.class})
public class TestSplitDeploymentUtil {
@Test
public void testRunWithIsolatingClassLoader() throws Exception {
// partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
- PowerMockito.spy(ClusterBasedJobCoordinator.class);
+ PowerMockito.spy(ClusterBasedJobCoordinatorRunner.class);
// save the context classloader to make sure that it gets set properly once the test is finished
ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader classLoader = mock(ClassLoader.class);
String[] args = new String[]{"arg0", "arg1"};
- doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+ doReturn(ClusterBasedJobCoordinatorRunner.class).when(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
// stub the private static method which is called by reflection
PowerMockito.doAnswer(invocation -> {
@@ -53,19 +53,19 @@
// checks that the context classloader is set correctly
assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
return null;
- }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
+ }).when(ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", any());
try {
SplitDeploymentUtil.runWithClassLoader(classLoader,
- ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
+ ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
} finally {
// reset it explicitly just in case runWithClassLoader throws an exception
Thread.currentThread().setContextClassLoader(previousContextClassLoader);
}
// make sure that the classloader got used
- verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+ verify(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
// make sure runClusterBasedJobCoordinator only got called once
- verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
+ verifyPrivate(ClusterBasedJobCoordinatorRunner.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
}
}
diff --git a/samza-shell/src/main/bash/run-jc.sh b/samza-shell/src/main/bash/run-jc.sh
index cf3e734..fc8a3c4 100755
--- a/samza-shell/src/main/bash/run-jc.sh
+++ b/samza-shell/src/main/bash/run-jc.sh
@@ -22,4 +22,4 @@
# Set container name system properties for use in Log4J
[[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-job-coordinator"
-exec $(dirname $0)/run-class.sh org.apache.samza.clustermanager.ClusterBasedJobCoordinator "$@"
+exec $(dirname $0)/run-class.sh org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner "$@"