SAMZA-2537: Fix JobCoordinatorLaunchUtil's execution sequence.
Symptom: A new Samza job fail to deploy.
Cause: For a new samza job, coordinator stream may have not being created yet, therefore, JobCoordinatorLaunchUtil will fail when trying to fetch launch config from coordinator stream.
Changes: Update JobCoordinatorLaunchUtil to create coordinator stream if does not exist before fetching launch config from it.
Tests: None
API Changes: None
Upgrade Instructions: None
Usage Instructions: None
Author: Ke Wu <kwu@linkedin.com>
Reviewers: mynameborat <bharathkk@apache.org>
Closes #1370 from kw2542/SAMZA-2537
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index c943c0c..63a1f5c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -59,6 +59,8 @@
}
Config fullConfig = jobConfigs.get(0);
+ // Create coordinator stream if does not exist before fetching launch config from it.
+ CoordinatorStreamUtil.createCoordinatorStream(fullConfig);
MetricsRegistryMap metrics = new MetricsRegistryMap();
MetadataStore
metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig), metrics);
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index bf6aeb3..37d1393 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -49,6 +49,22 @@
}
/**
+ * Creates coordinator stream from config if it does not exist, otherwise no-op.
+ *
+ * @param config to create coordinator stream.
+ */
+ def createCoordinatorStream(config: Config): Unit = {
+ val systemAdmins = new SystemAdmins(config)
+
+ info("Creating coordinator stream")
+ val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
+ val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+ coordinatorSystemAdmin.start()
+ CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
+ coordinatorSystemAdmin.stop()
+ }
+
+ /**
* Creates a coordinator stream.
* @param coordinatorSystemStream the {@see SystemStream} that describes the stream to create.
* @param coordinatorSystemAdmin the {@see SystemAdmin} used to create the stream.
@@ -157,15 +173,7 @@
debug("config: %s" format config)
val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
- val systemAdmins = new SystemAdmins(config)
-
- // Create the coordinator stream if it doesn't exist
- info("Creating coordinator stream")
- val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
- val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
- coordinatorSystemAdmin.start()
- CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
- coordinatorSystemAdmin.stop()
+ CoordinatorStreamUtil.createCoordinatorStream(config)
if (resetJobConfig) {
info("Storing config in coordinator stream.")
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index 4bf0aaa..af27423 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -38,12 +38,15 @@
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.verifyNew;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({
@@ -68,6 +71,7 @@
ClusterBasedJobCoordinator mockJC = mock(ClusterBasedJobCoordinator.class);
PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+ PowerMockito.doNothing().when(CoordinatorStreamUtil.class, "createCoordinatorStream", any());
PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
PowerMockito.doReturn(autoSizingConfig).when(CoordinatorStreamUtil.class, "readLaunchConfigFromCoordinatorStream", any(), any());
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
@@ -79,5 +83,9 @@
verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(finalConfig));
verify(mockJC, times(1)).run();
+ verifyStatic(times(1));
+ CoordinatorStreamUtil.createCoordinatorStream(any());
+ verifyStatic(times(1));
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(any(), anyBoolean());
}
}