SAMZA-2501 : Optimizing startpoint manager to not make successive bootstrapMessage calls to coordinator-store (#1335)
Co-authored-by: Ray Manpreet Singh Matharu <rmatharu@rmatharu-mn1.linkedin.biz>
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index e155b58..0cb80b2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -262,13 +262,15 @@
MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
metadataResourceUtil.createResources();
- // fan out the startpoints
- StartpointManager startpointManager = createStartpointManager();
- startpointManager.start();
- try {
- startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
- } finally {
- startpointManager.stop();
+ // fan out the startpoints if startpoints is enabled
+ if (new JobConfig(config).getStartpointEnabled()) {
+ StartpointManager startpointManager = createStartpointManager();
+ startpointManager.start();
+ try {
+ startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+ } finally {
+ startpointManager.stop();
+ }
}
// Remap changelog partitions to tasks
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 37bfe37..dff2991 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -146,6 +146,8 @@
public static final String CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
"samza.cluster.based.job.coordinator.dependency.isolation.enabled";
+ private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
+
public JobConfig(Config config) {
super(config);
}
@@ -404,4 +406,8 @@
public Optional<String> getConfigLoaderFactory() {
return Optional.ofNullable(get(CONFIG_LOADER_FACTORY));
}
+
+ public boolean getStartpointEnabled() {
+ return getBoolean(JOB_STARTPOINT_ENABLED, true);
+ }
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 5a39520..ed0c875 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -396,8 +396,10 @@
// Metadata store lifecycle managed outside of the SamzaContainer.
// All manager lifecycles are managed in the SamzaContainer including startpointManager
StartpointManager startpointManager = null;
- if (metadataStore != null) {
+ if (metadataStore != null && new JobConfig(config).getStartpointEnabled()) {
startpointManager = new StartpointManager(metadataStore);
+ } else if (!new JobConfig(config).getStartpointEnabled()) {
+ LOGGER.warn("StartpointManager not instantiated because startpoints is not enabled");
} else {
LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor");
}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index c68ec03..40d3d5c 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -102,7 +102,10 @@
LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
// StartpointManager wraps the coordinatorStreamStore in the namespaces internally
- StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore);
+ StartpointManager startpointManager = null;
+ if (new JobConfig(config).getStartpointEnabled()) {
+ startpointManager = new StartpointManager(coordinatorStreamStore);
+ }
Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index 9a196c3..f0a4cc3 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -158,8 +158,24 @@
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
* It is empty if it does not exist or if it is too stale.
*/
+ @VisibleForTesting
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {
- return readStartpoint(ssp, null);
+ Map<String, byte[]> startpointBytes = readWriteStore.all();
+ // there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name
+ return readStartpoint(startpointBytes, ssp, null);
+ }
+
+ /**
+ * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
+ * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
+ * @param taskName the {@link TaskName} to fetch the {@link Startpoint} for.
+ * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
+ * It is empty if it does not exist or if it is too stale.
+ */
+ @VisibleForTesting
+ public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
+ Map<String, byte[]> startpointBytes = readWriteStore.all();
+ return readStartpoint(startpointBytes, ssp, taskName);
}
/**
@@ -169,11 +185,11 @@
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}.
* It is empty if it does not exist or if it is too stale.
*/
- public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
+ public Optional<Startpoint> readStartpoint(Map<String, byte[]> startpointMap, SystemStreamPartition ssp, TaskName taskName) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
- byte[] startpointBytes = readWriteStore.get(toReadWriteStoreKey(ssp, taskName));
+ byte[] startpointBytes = startpointMap.get(toReadWriteStoreKey(ssp, taskName));
if (ArrayUtils.isNotEmpty(startpointBytes)) {
try {
@@ -191,6 +207,7 @@
return Optional.empty();
}
+
/**
* Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
* @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
@@ -240,6 +257,8 @@
Instant now = Instant.now();
HashMultimap<SystemStreamPartition, TaskName> deleteKeys = HashMultimap.create();
HashMap<TaskName, StartpointFanOutPerTask> fanOuts = new HashMap<>();
+ Map<String, byte[]> startpointMap = readWriteStore.all();
+
for (TaskName taskName : taskToSSPs.keySet()) {
Set<SystemStreamPartition> ssps = taskToSSPs.get(taskName);
if (CollectionUtils.isEmpty(ssps)) {
@@ -247,10 +266,10 @@
continue;
}
for (SystemStreamPartition ssp : ssps) {
- Optional<Startpoint> startpoint = readStartpoint(ssp); // Read SSP-only key
+ Optional<Startpoint> startpoint = readStartpoint(startpointMap, ssp, null); // Read SSP-only key
startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));
- Optional<Startpoint> startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key
+ Optional<Startpoint> startpointForTask = readStartpoint(startpointMap, ssp, taskName); // Read SSP+taskName key
startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));
Optional<Startpoint> startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask);
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index feaabba..86c1f06 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -326,13 +326,15 @@
}
configStore.flush();
- // fan out the startpoints
- StartpointManager startpointManager = createStartpointManager();
- startpointManager.start();
- try {
- startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
- } finally {
- startpointManager.stop();
+ if (new JobConfig(config).getStartpointEnabled()) {
+ // fan out the startpoints
+ StartpointManager startpointManager = createStartpointManager();
+ startpointManager.start();
+ try {
+ startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+ } finally {
+ startpointManager.stop();
+ }
}
} else {
LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out.");
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fab8c6e..3264932 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -91,13 +91,15 @@
val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
metadataResourceUtil.createResources()
- // fan out the startpoints
- val startpointManager = new StartpointManager(coordinatorStreamStore)
- startpointManager.start()
- try {
- startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
- } finally {
- startpointManager.stop()
+ if (new JobConfig(config).getStartpointEnabled()) {
+ // fan out the startpoints
+ val startpointManager = new StartpointManager(coordinatorStreamStore)
+ startpointManager.start()
+ try {
+ startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
+ } finally {
+ startpointManager.stop()
+ }
}
val taskConfig = new TaskConfig(config)
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index c1a3683..79bd181 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -91,13 +91,15 @@
val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
metadataResourceUtil.createResources()
- // fan out the startpoints
- val startpointManager = new StartpointManager(coordinatorStreamStore)
- startpointManager.start()
- try {
- startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
- } finally {
- startpointManager.stop()
+ if (new JobConfig(config).getStartpointEnabled()) {
+ // fan out the startpoints
+ val startpointManager = new StartpointManager(coordinatorStreamStore)
+ startpointManager.start()
+ try {
+ startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
+ } finally {
+ startpointManager.stop()
+ }
}
val containerId = "0"