SAMZA-2488: Add JobCoordinatorLaunchUtil to handle common logic when launching job coordiantor. (#1318)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index b9d054b..e7647f4 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -35,9 +35,6 @@
 import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.application.ApplicationUtil;
-import org.apache.samza.application.descriptors.ApplicationDescriptor;
-import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
-import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.classloader.IsolatingClassLoaderFactory;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
 import org.apache.samza.config.ApplicationConfig;
@@ -58,7 +55,6 @@
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -571,8 +567,16 @@
           throw new SamzaException(e);
         }
       } else {
-        ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
-        jc.run();
+        JobConfig jobConfig = new JobConfig(submissionConfig);
+
+        if (!jobConfig.getConfigLoaderFactory().isPresent()) {
+          throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
+        }
+
+        // load full job config with ConfigLoader
+        Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
+
+        JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig);
       }
 
       LOG.info("Finished running ClusterBasedJobCoordinator");
@@ -615,49 +619,6 @@
   }
 
   /**
-   * Initialize {@link ClusterBasedJobCoordinator} with submission config, full job config will be fetched using
-   * specified {@link org.apache.samza.config.ConfigLoaderFactory}
-   *
-   * @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory}
-   * @return {@link ClusterBasedJobCoordinator}
-   */
-  @VisibleForTesting
-  static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) {
-    JobConfig jobConfig = new JobConfig(submissionConfig);
-
-    if (!jobConfig.getConfigLoaderFactory().isPresent()) {
-      throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
-    }
-
-    MetricsRegistryMap metrics = new MetricsRegistryMap();
-    // load full job config with ConfigLoader
-    Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
-
-    // Execute planning
-    ApplicationDescriptorImpl<? extends ApplicationDescriptor>
-        appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig);
-    RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
-    List<JobConfig> jobConfigs = planner.prepareJobs();
-
-    if (jobConfigs.size() != 1) {
-      throw new SamzaException("Only support single remote job is supported.");
-    }
-
-    Config config = jobConfigs.get(0);
-
-    // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
-    CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true);
-    DiagnosticsUtil.createDiagnosticsStream(config);
-    MetadataStore metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics);
-    metadataStore.init();
-
-    return new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        config);
-  }
-
-  /**
    * Convert Samza config to command line arguments to invoke app.main.class
    *
    * @param config Samza config to convert.
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
new file mode 100644
index 0000000..fc1d34e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.List;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.execution.RemoteJobPlanner;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.DiagnosticsUtil;
+
+
+/**
+ * Util class to launch and run {@link ClusterBasedJobCoordinator}.
+ * This util is being used by both high/low and beam API Samza jobs.
+ */
+public class JobCoordinatorLaunchUtil {
+  /**
+   * Run {@link ClusterBasedJobCoordinator} with full job config.
+   *
+   * @param app SamzaApplication to run.
+   * @param config full job config.
+   */
+  @SuppressWarnings("rawtypes")
+  public static void run(SamzaApplication app, Config config) {
+    // Execute planning
+    ApplicationDescriptorImpl<? extends ApplicationDescriptor>
+        appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
+    RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
+    List<JobConfig> jobConfigs = planner.prepareJobs();
+
+    if (jobConfigs.size() != 1) {
+      throw new SamzaException("Only support single remote job is supported.");
+    }
+
+    Config finalConfig = jobConfigs.get(0);
+
+    // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
+    DiagnosticsUtil.createDiagnosticsStream(finalConfig);
+    MetricsRegistryMap metrics = new MetricsRegistryMap();
+    MetadataStore
+        metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(finalConfig), metrics);
+    // MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
+    // initialization of MetadataStore can be moved to ClusterBasedJobCoordinator after we clean up
+    // ClusterBasedJobCoordinator#createFromMetadataStore
+    metadataStore.init();
+
+    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
+        metrics,
+        metadataStore,
+        finalConfig);
+    jc.run();
+  }
+
+  private JobCoordinatorLaunchUtil() {}
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 513c075..967bc81 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -23,30 +23,25 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.MockStreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.system.MockSystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.ConfigUtil;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.After;
 import org.junit.Before;
@@ -68,7 +63,6 @@
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
-import static org.powermock.api.mockito.PowerMockito.verifyNew;
 
 
 /**
@@ -213,35 +207,6 @@
     verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
   }
 
-  @Test(expected = SamzaException.class)
-  public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() {
-    ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig());
-  }
-
-  @Test
-  public void testCreateFromConfigLoader() throws Exception {
-    Map<String, String> config = new HashMap<>();
-    config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName());
-    config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName());
-    config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
-        getClass().getResource("/test.properties").getPath());
-    Config submissionConfig = new MapConfig(config);
-    JobConfig fullJobConfig = new JobConfig(ConfigUtil.loadConfig(submissionConfig));
-
-    RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
-    CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);
-
-    PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class));
-    PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
-    PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
-    PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
-    when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));
-
-    ClusterBasedJobCoordinator.createFromConfigLoader(submissionConfig);
-
-    verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
-  }
-
   @Test
   public void testToArgs() {
     ApplicationConfig appConfig = new ApplicationConfig(new MapConfig(ImmutableMap.of(
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
new file mode 100644
index 0000000..827e312
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.application.MockStreamApplication;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.execution.RemoteJobPlanner;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.verifyNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    CoordinatorStreamUtil.class,
+    JobCoordinatorLaunchUtil.class,
+    CoordinatorStreamStore.class,
+    RemoteJobPlanner.class})
+public class TestJobCoordinatorLaunchUtil {
+  @Test
+  public void testCreateFromConfigLoader() throws Exception {
+    Map<String, String> config = new HashMap<>();
+    config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getName());
+    config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
+        getClass().getResource("/test.properties").getPath());
+    JobConfig originalConfig = new JobConfig(ConfigUtil.loadConfig(new MapConfig(config)));
+    JobConfig fullJobConfig = new JobConfig(new MapConfig(originalConfig, Collections.singletonMap("isAfterPlanning", "true")));
+
+    RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
+    CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);
+    ClusterBasedJobCoordinator mockJC = mock(ClusterBasedJobCoordinator.class);
+
+    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
+    PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
+    PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
+    PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
+    when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));
+
+    JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);
+
+    verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
+    verify(mockJC, times(1)).run();
+  }
+}