SAMZA-2584: Refactor ClusterBasedJobCoordinator (#1424)

Issues: In the deployment flow of a beam job, we will have a complicate flow: ClusterBasedJobCoordinator#main -> Beam main class -> JobCoordinatorLaunchUtil -> ClusterBasedJobCoordinator.
Changes:
  1. Move ClusterBasedJobCoordinator#main to ClusterBasedJobCoordinatorRunner#main
  2. Update run-jc.sh to invoke ClusterBasedJobCoordinatorRunner
Tests:
  1. unit tests
  2. Deployed hello samza job successfully with the change following instructions on http://samza.apache.org/startup/hello-samza/latest/
API Changes: None
Upgrade Instructions: None
Usage Instructions: None
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 68e2f77..6295ed6 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -19,29 +19,19 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
-import org.apache.samza.application.ApplicationUtil;
-import org.apache.samza.classloader.IsolatingClassLoaderFactory;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.LocalityManager;
@@ -52,7 +42,6 @@
 import org.apache.samza.coordinator.PartitionChangeException;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.StreamRegexMonitor;
-import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
@@ -63,16 +52,12 @@
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.ConfigUtil;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
-import org.apache.samza.util.SplitDeploymentUtil;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -465,142 +450,4 @@
   ContainerProcessManager createContainerProcessManager() {
     return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
   }
-
-  /**
-   * The entry point for the {@link ClusterBasedJobCoordinator}.
-   */
-  public static void main(String[] args) {
-    Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
-      LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
-      System.exit(1);
-    });
-    if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
-      // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
-      runClusterBasedJobCoordinator(args);
-    } else {
-      SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
-          ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
-    }
-    System.exit(0);
-  }
-
-  /**
-   * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
-   * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
-   */
-  private static void runClusterBasedJobCoordinator(String[] args) {
-    final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG);
-    final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG);
-
-    if (!StringUtils.isBlank(submissionEnv)) {
-      Config submissionConfig;
-      try {
-        //Read and parse the coordinator system config.
-        LOG.info("Parsing submission config {}", submissionEnv);
-        submissionConfig =
-            new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class));
-        LOG.info("Using the submission config: {}.", submissionConfig);
-      } catch (IOException e) {
-        LOG.error("Exception while reading submission config", e);
-        throw new SamzaException(e);
-      }
-
-      ApplicationConfig appConfig = new ApplicationConfig(submissionConfig);
-
-      /*
-       * Invoke app.main.class with app.main.args when present.
-       * For Beam jobs, app.main.class will be Beam's main class
-       * and app.main.args will be Beam's pipeline options.
-       */
-      if (appConfig.getAppMainClass().isPresent()) {
-        String className = appConfig.getAppMainClass().get();
-        LOG.info("Invoke main {}", className);
-        try {
-          Class<?> cls = Class.forName(className);
-          Method mainMethod = cls.getMethod("main", String[].class);
-          mainMethod.invoke(null, (Object) toArgs(appConfig));
-        } catch (Exception e) {
-          throw new SamzaException(e);
-        }
-      } else {
-        JobConfig jobConfig = new JobConfig(submissionConfig);
-
-        if (!jobConfig.getConfigLoaderFactory().isPresent()) {
-          throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
-        }
-
-        // load full job config with ConfigLoader
-        Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
-
-        JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig);
-      }
-
-      LOG.info("Finished running ClusterBasedJobCoordinator");
-    } else {
-      // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
-      Config coordinatorSystemConfig;
-      try {
-        //Read and parse the coordinator system config.
-        LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-        coordinatorSystemConfig =
-            new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
-        LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig);
-      } catch (IOException e) {
-        LOG.error("Exception while reading coordinator stream config", e);
-        throw new SamzaException(e);
-      }
-      ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig);
-      jc.run();
-      LOG.info("Finished running ClusterBasedJobCoordinator");
-    }
-  }
-
-  /**
-   * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from
-   * coordinator stream.
-   *
-   * @param metadataStoreConfig to initialize {@link MetadataStore}
-   * @return {@link ClusterBasedJobCoordinator}
-   */
-  // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
-  @VisibleForTesting
-  static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) {
-    MetricsRegistryMap metrics = new MetricsRegistryMap();
-
-    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics);
-    coordinatorStreamStore.init();
-    Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
-
-    return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config);
-  }
-
-  /**
-   * Convert Samza config to command line arguments to invoke app.main.class
-   *
-   * @param config Samza config to convert.
-   * @return converted command line arguments.
-   */
-  @VisibleForTesting
-  static String[] toArgs(ApplicationConfig config) {
-    List<String> args = new ArrayList<>(config.size() * 2);
-
-    config.forEach((key, value) -> {
-      if (key.equals(ApplicationConfig.APP_MAIN_ARGS)) {
-        /*
-         * Converts native beam pipeline options such as
-         * --runner=SamzaRunner --maxSourceParallelism=1024
-         */
-        args.addAll(Arrays.asList(value.split("\\s")));
-      } else {
-        /*
-         * Converts native Samza configs to config override format such as
-         * --config job.name=test
-         */
-        args.add("--config");
-        args.add(String.format("%s=%s", key, value));
-      }
-    });
-
-    return args.toArray(new String[0]);
-  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
new file mode 100644
index 0000000..107c473
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.classloader.IsolatingClassLoaderFactory;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.SplitDeploymentUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ClusterBasedJobCoordinatorRunner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ClusterBasedJobCoordinatorRunner.class);
+
+  /**
+   * The entry point for the {@link ClusterBasedJobCoordinator}.
+   */
+  public static void main(String[] args) {
+    Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
+      LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
+      System.exit(1);
+    });
+    if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
+      // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
+      runClusterBasedJobCoordinator(args);
+    } else {
+      SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
+          ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
+    }
+    System.exit(0);
+  }
+
+  /**
+   * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
+   * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+   */
+  @VisibleForTesting
+  static void runClusterBasedJobCoordinator(String[] args) {
+    final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG);
+    final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG);
+
+    if (!StringUtils.isBlank(submissionEnv)) {
+      Config submissionConfig;
+      try {
+        //Read and parse the coordinator system config.
+        LOG.info("Parsing submission config {}", submissionEnv);
+        submissionConfig =
+            new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class));
+        LOG.info("Using the submission config: {}.", submissionConfig);
+      } catch (IOException e) {
+        LOG.error("Exception while reading submission config", e);
+        throw new SamzaException(e);
+      }
+
+      ApplicationConfig appConfig = new ApplicationConfig(submissionConfig);
+
+      /*
+       * Invoke app.main.class with app.main.args when present.
+       * For Beam jobs, app.main.class will be Beam's main class
+       * and app.main.args will be Beam's pipeline options.
+       */
+      if (appConfig.getAppMainClass().isPresent()) {
+        String className = appConfig.getAppMainClass().get();
+        LOG.info("Invoke main {}", className);
+        try {
+          Class<?> cls = Class.forName(className);
+          Method mainMethod = cls.getMethod("main", String[].class);
+          mainMethod.invoke(null, (Object) toArgs(appConfig));
+        } catch (Exception e) {
+          throw new SamzaException(e);
+        }
+      } else {
+        JobConfig jobConfig = new JobConfig(submissionConfig);
+
+        if (!jobConfig.getConfigLoaderFactory().isPresent()) {
+          throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
+        }
+
+        // load full job config with ConfigLoader
+        Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
+
+        JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig);
+      }
+
+      LOG.info("Finished running ClusterBasedJobCoordinator");
+    } else {
+      // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
+      Config coordinatorSystemConfig;
+      try {
+        //Read and parse the coordinator system config.
+        LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+        coordinatorSystemConfig =
+            new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
+        LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig);
+      } catch (IOException e) {
+        LOG.error("Exception while reading coordinator stream config", e);
+        throw new SamzaException(e);
+      }
+      ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig);
+      jc.run();
+      LOG.info("Finished running ClusterBasedJobCoordinator");
+    }
+  }
+
+  /**
+   * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from
+   * coordinator stream.
+   *
+   * @param metadataStoreConfig to initialize {@link org.apache.samza.metadatastore.MetadataStore}
+   * @return {@link ClusterBasedJobCoordinator}
+   */
+  // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed.
+  @VisibleForTesting
+  static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) {
+    MetricsRegistryMap metrics = new MetricsRegistryMap();
+
+    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics);
+    coordinatorStreamStore.init();
+    Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+
+    return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config);
+  }
+
+  /**
+   * Convert Samza config to command line arguments to invoke app.main.class
+   *
+   * @param config Samza config to convert.
+   * @return converted command line arguments.
+   */
+  @VisibleForTesting
+  static String[] toArgs(ApplicationConfig config) {
+    List<String> args = new ArrayList<>(config.size() * 2);
+
+    config.forEach((key, value) -> {
+      if (key.equals(ApplicationConfig.APP_MAIN_ARGS)) {
+        /*
+         * Converts native beam pipeline options such as
+         * --runner=SamzaRunner --maxSourceParallelism=1024
+         */
+        args.addAll(Arrays.asList(value.split("\\s")));
+      } else {
+        /*
+         * Converts native Samza configs to config override format such as
+         * --config job.name=test
+         */
+        args.add("--config");
+        args.add(String.format("%s=%s", key, value));
+      }
+    });
+
+    return args.toArray(new String[0]);
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 6444451..caa1ffe 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -69,7 +69,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({
     CoordinatorStreamUtil.class,
-    ClusterBasedJobCoordinator.class,
+    ClusterBasedJobCoordinatorRunner.class,
     CoordinatorStreamStore.class,
     RemoteJobPlanner.class})
 public class TestClusterBasedJobCoordinator {
@@ -112,7 +112,7 @@
     CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
     producer.writeConfig("test-job", config);
 
-    ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config);
+    ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config);
 
     // change the input system stream metadata
     MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
@@ -132,7 +132,7 @@
     CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
     producer.writeConfig("test-job", config);
 
-    ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinator.createFromMetadataStore(config);
+    ClusterBasedJobCoordinator clusterCoordinator = ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config);
 
     // change the input system stream metadata
     MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
@@ -150,7 +150,7 @@
     Config config = new MapConfig(configMap);
     MockitoException stopException = new MockitoException("Stop");
 
-    ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(ClusterBasedJobCoordinator.createFromMetadataStore(config));
+    ClusterBasedJobCoordinator clusterCoordinator = Mockito.spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(config));
     ContainerProcessManager mockContainerProcessManager = mock(ContainerProcessManager.class);
     doReturn(true).when(mockContainerProcessManager).shouldShutdown();
     StartpointManager mockStartpointManager = mock(StartpointManager.class);
@@ -186,7 +186,7 @@
         "--config", "app.class=class1",
         "--runner=SamzaRunner",
         "--maxSourceParallelism=1024");
-    List<String> actual = Arrays.asList(ClusterBasedJobCoordinator.toArgs(appConfig));
+    List<String> actual = Arrays.asList(ClusterBasedJobCoordinatorRunner.toArgs(appConfig));
 
     // cannot assert expected equals to actual as the order can be different.
     assertEquals(expected.size(), actual.size());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinatorRunner.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinatorRunner.java
new file mode 100644
index 0000000..548cb4a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinatorRunner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.util.ConfigUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    System.class,
+    ClusterBasedJobCoordinatorRunner.class,
+    ApplicationUtil.class,
+    JobCoordinatorLaunchUtil.class})
+public class TestClusterBasedJobCoordinatorRunner {
+
+  @Test
+  public void testRunClusterBasedJobCoordinator() throws Exception  {
+    Config submissionConfig = new MapConfig(ImmutableMap.of(
+        JobConfig.CONFIG_LOADER_FACTORY,
+        PropertiesConfigLoaderFactory.class.getName(),
+        PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
+        getClass().getResource("/test.properties").getPath()));
+    Config fullConfig = ConfigUtil.loadConfig(submissionConfig);
+    StreamApplication mockApplication = mock(StreamApplication.class);
+    PowerMockito.mockStatic(System.class, ApplicationUtil.class, JobCoordinatorLaunchUtil.class);
+    PowerMockito
+        .when(System.getenv(eq(ShellCommandConfig.ENV_SUBMISSION_CONFIG)))
+        .thenReturn(SamzaObjectMapper.getObjectMapper().writeValueAsString(submissionConfig));
+    PowerMockito
+        .when(ApplicationUtil.fromConfig(any()))
+        .thenReturn(mockApplication);
+    PowerMockito.doNothing().when(JobCoordinatorLaunchUtil.class, "run", mockApplication, fullConfig);
+
+    ClusterBasedJobCoordinatorRunner.runClusterBasedJobCoordinator(null);
+
+    PowerMockito.verifyStatic(times(1));
+    JobCoordinatorLaunchUtil.run(mockApplication, fullConfig);
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
index d1dd8f8..72772ba 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.util;
 
-import org.apache.samza.clustermanager.ClusterBasedJobCoordinator;
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
@@ -33,18 +33,18 @@
 
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ClusterBasedJobCoordinator.class})
+@PrepareForTest({ClusterBasedJobCoordinatorRunner.class})
 public class TestSplitDeploymentUtil {
 
   @Test
   public void testRunWithIsolatingClassLoader() throws Exception {
     // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
-    PowerMockito.spy(ClusterBasedJobCoordinator.class);
+    PowerMockito.spy(ClusterBasedJobCoordinatorRunner.class);
     // save the context classloader to make sure that it gets set properly once the test is finished
     ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
     ClassLoader classLoader = mock(ClassLoader.class);
     String[] args = new String[]{"arg0", "arg1"};
-    doReturn(ClusterBasedJobCoordinator.class).when(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+    doReturn(ClusterBasedJobCoordinatorRunner.class).when(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
 
     // stub the private static method which is called by reflection
     PowerMockito.doAnswer(invocation -> {
@@ -53,19 +53,19 @@
       // checks that the context classloader is set correctly
       assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
       return null;
-    }).when(ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", any());
+    }).when(ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", any());
 
     try {
       SplitDeploymentUtil.runWithClassLoader(classLoader,
-          ClusterBasedJobCoordinator.class, "runClusterBasedJobCoordinator", args);
+          ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
       assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
     } finally {
       // reset it explicitly just in case runWithClassLoader throws an exception
       Thread.currentThread().setContextClassLoader(previousContextClassLoader);
     }
     // make sure that the classloader got used
-    verify(classLoader).loadClass(ClusterBasedJobCoordinator.class.getName());
+    verify(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
     // make sure runClusterBasedJobCoordinator only got called once
-    verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
+    verifyPrivate(ClusterBasedJobCoordinatorRunner.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
   }
 }
diff --git a/samza-shell/src/main/bash/run-jc.sh b/samza-shell/src/main/bash/run-jc.sh
index cf3e734..fc8a3c4 100755
--- a/samza-shell/src/main/bash/run-jc.sh
+++ b/samza-shell/src/main/bash/run-jc.sh
@@ -22,4 +22,4 @@
 # Set container name system properties for use in Log4J
 [[ $JAVA_OPTS != *-Dsamza.container.name* ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-job-coordinator"
 
-exec $(dirname $0)/run-class.sh org.apache.samza.clustermanager.ClusterBasedJobCoordinator "$@"
+exec $(dirname $0)/run-class.sh org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner "$@"