SAMZA-2501 : Optimizing startpoint manager to not make successive bootstrapMessage calls to coordinator-store (#1335)



Co-authored-by: Ray Manpreet  Singh Matharu <rmatharu@rmatharu-mn1.linkedin.biz>
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index e155b58..0cb80b2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -262,13 +262,15 @@
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints
-      StartpointManager startpointManager = createStartpointManager();
-      startpointManager.start();
-      try {
-        startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
-      } finally {
-        startpointManager.stop();
+      // fan out the startpoints if startpoints is enabled
+      if (new JobConfig(config).getStartpointEnabled()) {
+        StartpointManager startpointManager = createStartpointManager();
+        startpointManager.start();
+        try {
+          startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+        } finally {
+          startpointManager.stop();
+        }
       }
 
       // Remap changelog partitions to tasks
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 37bfe37..dff2991 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -146,6 +146,8 @@
   public static final String CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
       "samza.cluster.based.job.coordinator.dependency.isolation.enabled";
 
+  private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
+
   public JobConfig(Config config) {
     super(config);
   }
@@ -404,4 +406,8 @@
   public Optional<String> getConfigLoaderFactory() {
     return Optional.ofNullable(get(CONFIG_LOADER_FACTORY));
   }
+
+  public boolean getStartpointEnabled() {
+    return getBoolean(JOB_STARTPOINT_ENABLED, true);
+  }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 5a39520..ed0c875 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -396,8 +396,10 @@
     // Metadata store lifecycle managed outside of the SamzaContainer.
     // All manager lifecycles are managed in the SamzaContainer including startpointManager
     StartpointManager startpointManager = null;
-    if (metadataStore != null) {
+    if (metadataStore != null && new JobConfig(config).getStartpointEnabled()) {
       startpointManager = new StartpointManager(metadataStore);
+    } else if (!new JobConfig(config).getStartpointEnabled()) {
+      LOGGER.warn("StartpointManager not instantiated because startpoints is not enabled");
     } else {
       LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor");
     }
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index c68ec03..40d3d5c 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -102,7 +102,10 @@
       LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
 
       // StartpointManager wraps the coordinatorStreamStore in the namespaces internally
-      StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore);
+      StartpointManager startpointManager = null;
+      if (new JobConfig(config).getStartpointEnabled()) {
+        startpointManager = new StartpointManager(coordinatorStreamStore);
+      }
 
       Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
 
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index 9a196c3..f0a4cc3 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -158,8 +158,24 @@
    * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
    *         It is empty if it does not exist or if it is too stale.
    */
+  @VisibleForTesting
   public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {
-    return readStartpoint(ssp, null);
+    Map<String, byte[]> startpointBytes = readWriteStore.all();
+    // there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name
+    return readStartpoint(startpointBytes, ssp, null);
+  }
+
+  /**
+   * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
+   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
+   * @param taskName the {@link TaskName} to fetch the {@link Startpoint} for.
+   * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
+   *         It is empty if it does not exist or if it is too stale.
+   */
+  @VisibleForTesting
+  public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
+    Map<String, byte[]> startpointBytes = readWriteStore.all();
+    return readStartpoint(startpointBytes, ssp, taskName);
   }
 
   /**
@@ -169,11 +185,11 @@
    * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}.
    *         It is empty if it does not exist or if it is too stale.
    */
-  public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
+  public Optional<Startpoint> readStartpoint(Map<String, byte[]> startpointMap, SystemStreamPartition ssp, TaskName taskName) {
     Preconditions.checkState(!stopped, "Underlying metadata store not available");
     Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
-    byte[] startpointBytes = readWriteStore.get(toReadWriteStoreKey(ssp, taskName));
+    byte[] startpointBytes = startpointMap.get(toReadWriteStoreKey(ssp, taskName));
 
     if (ArrayUtils.isNotEmpty(startpointBytes)) {
       try {
@@ -191,6 +207,7 @@
     return Optional.empty();
   }
 
+
   /**
    * Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
    * @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
@@ -240,6 +257,8 @@
     Instant now = Instant.now();
     HashMultimap<SystemStreamPartition, TaskName> deleteKeys = HashMultimap.create();
     HashMap<TaskName, StartpointFanOutPerTask> fanOuts = new HashMap<>();
+    Map<String, byte[]> startpointMap = readWriteStore.all();
+
     for (TaskName taskName : taskToSSPs.keySet()) {
       Set<SystemStreamPartition> ssps = taskToSSPs.get(taskName);
       if (CollectionUtils.isEmpty(ssps)) {
@@ -247,10 +266,10 @@
         continue;
       }
       for (SystemStreamPartition ssp : ssps) {
-        Optional<Startpoint> startpoint = readStartpoint(ssp); // Read SSP-only key
+        Optional<Startpoint> startpoint = readStartpoint(startpointMap, ssp, null); // Read SSP-only key
         startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));
 
-        Optional<Startpoint> startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key
+        Optional<Startpoint> startpointForTask = readStartpoint(startpointMap, ssp, taskName); // Read SSP+taskName key
         startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));
 
         Optional<Startpoint> startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask);
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index feaabba..86c1f06 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -326,13 +326,15 @@
         }
         configStore.flush();
 
-        // fan out the startpoints
-        StartpointManager startpointManager = createStartpointManager();
-        startpointManager.start();
-        try {
-          startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
-        } finally {
-          startpointManager.stop();
+        if (new JobConfig(config).getStartpointEnabled()) {
+          // fan out the startpoints
+          StartpointManager startpointManager = createStartpointManager();
+          startpointManager.start();
+          try {
+            startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+          } finally {
+            startpointManager.stop();
+          }
         }
       } else {
         LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out.");
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fab8c6e..3264932 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -91,13 +91,15 @@
     val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
     metadataResourceUtil.createResources()
 
-    // fan out the startpoints
-    val startpointManager = new StartpointManager(coordinatorStreamStore)
-    startpointManager.start()
-    try {
-      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
-    } finally {
-      startpointManager.stop()
+    if (new JobConfig(config).getStartpointEnabled()) {
+      // fan out the startpoints
+      val startpointManager = new StartpointManager(coordinatorStreamStore)
+      startpointManager.start()
+      try {
+        startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
+      } finally {
+        startpointManager.stop()
+      }
     }
 
     val taskConfig = new TaskConfig(config)
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index c1a3683..79bd181 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -91,13 +91,15 @@
     val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
     metadataResourceUtil.createResources()
 
-    // fan out the startpoints
-    val startpointManager = new StartpointManager(coordinatorStreamStore)
-    startpointManager.start()
-    try {
-      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
-    } finally {
-      startpointManager.stop()
+    if (new JobConfig(config).getStartpointEnabled()) {
+      // fan out the startpoints
+      val startpointManager = new StartpointManager(coordinatorStreamStore)
+      startpointManager.start()
+      try {
+        startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
+      } finally {
+        startpointManager.stop()
+      }
     }
 
     val containerId = "0"