SAMZA-2537: Fix JobCoordinatorLaunchUtil's execution sequence.

Symptom: A new Samza job fail to deploy.
Cause: For a new samza job, coordinator stream may have not being created yet, therefore, JobCoordinatorLaunchUtil will fail when trying to fetch launch config from coordinator stream.
Changes: Update JobCoordinatorLaunchUtil to create coordinator stream if does not exist before fetching launch config from it.
Tests: None
API Changes: None
Upgrade Instructions: None
Usage Instructions: None 

Author: Ke Wu <kwu@linkedin.com>

Reviewers: mynameborat <bharathkk@apache.org>

Closes #1370 from kw2542/SAMZA-2537
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index c943c0c..63a1f5c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -59,6 +59,8 @@
     }
 
     Config fullConfig = jobConfigs.get(0);
+    // Create coordinator stream if does not exist before fetching launch config from it.
+    CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
     MetricsRegistryMap metrics = new MetricsRegistryMap();
     MetadataStore
         metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig), metrics);
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index bf6aeb3..37d1393 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -49,6 +49,22 @@
   }
 
   /**
+   * Creates coordinator stream from config if it does not exist, otherwise no-op.
+   *
+   * @param config to create coordinator stream.
+   */
+  def createCoordinatorStream(config: Config): Unit = {
+    val systemAdmins = new SystemAdmins(config)
+
+    info("Creating coordinator stream")
+    val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
+    val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+    coordinatorSystemAdmin.start()
+    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
+    coordinatorSystemAdmin.stop()
+  }
+
+  /**
     * Creates a coordinator stream.
     * @param coordinatorSystemStream the {@see SystemStream} that describes the stream to create.
     * @param coordinatorSystemAdmin the {@see SystemAdmin} used to create the stream.
@@ -157,15 +173,7 @@
     debug("config: %s" format config)
     val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
     val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
-    val systemAdmins = new SystemAdmins(config)
-
-    // Create the coordinator stream if it doesn't exist
-    info("Creating coordinator stream")
-    val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
-    val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
-    coordinatorSystemAdmin.start()
-    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
-    coordinatorSystemAdmin.stop()
+    CoordinatorStreamUtil.createCoordinatorStream(config)
 
     if (resetJobConfig) {
       info("Storing config in coordinator stream.")
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index 4bf0aaa..af27423 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -38,12 +38,15 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.verifyNew;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({
@@ -68,6 +71,7 @@
     ClusterBasedJobCoordinator mockJC = mock(ClusterBasedJobCoordinator.class);
 
     PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    PowerMockito.doNothing().when(CoordinatorStreamUtil.class, "createCoordinatorStream", any());
     PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
     PowerMockito.doReturn(autoSizingConfig).when(CoordinatorStreamUtil.class, "readLaunchConfigFromCoordinatorStream", any(), any());
     PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
@@ -79,5 +83,9 @@
 
     verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(finalConfig));
     verify(mockJC, times(1)).run();
+    verifyStatic(times(1));
+    CoordinatorStreamUtil.createCoordinatorStream(any());
+    verifyStatic(times(1));
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(any(), anyBoolean());
   }
 }