Fix perferomance bug with async commit
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
index 91f3df3..44b40b4 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
@@ -20,6 +20,7 @@
package org.apache.samza.storage;
import java.io.File;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.samza.config.Config;
@@ -29,6 +30,7 @@
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.Clock;
@@ -40,6 +42,7 @@
TaskBackupManager getBackupManager(JobContext jobContext,
ContainerModel containerModel,
TaskModel taskModel,
+ Map<String, SystemAdmin> systemNameSystemAdminMap,
ExecutorService backupExecutor,
MetricsRegistry taskInstanceMetricsRegistry,
Config config,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index 0772449..9f31171 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -40,6 +40,7 @@
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
@@ -66,13 +67,14 @@
public TaskBackupManager getBackupManager(JobContext jobContext,
ContainerModel containerModel,
TaskModel taskModel,
+ Map<String, SystemAdmin> systemNameSystemAdminsMap,
ExecutorService backupExecutor,
MetricsRegistry metricsRegistry,
Config config,
Clock clock,
File loggedStoreBaseDir,
File nonLoggedStoreBaseDir) {
- SystemAdmins systemAdmins = new SystemAdmins(config);
+ SystemAdmins systemAdmins = new SystemAdmins(systemNameSystemAdminsMap);
StorageConfig storageConfig = new StorageConfig(config);
Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
index e2512b4..dc60598 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
@@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import java.io.File;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
@@ -41,6 +42,7 @@
import org.apache.samza.storage.TaskRestoreManager;
import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ReflectionUtil;
@@ -51,6 +53,7 @@
JobContext jobContext,
ContainerModel containerModel,
TaskModel taskModel,
+ Map<String, SystemAdmin> systemNameSystemAdminsMap,
ExecutorService backupExecutor,
MetricsRegistry metricsRegistry,
Config config,
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f0aa20b..ac5c198 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -571,6 +571,7 @@
info ("Got task side input SSPs: %s" format taskSideInputSSPs)
val taskBackupManagerMap = new util.HashMap[String, TaskBackupManager]()
+ val systemAdminsMap = systemAdmins.getSystemAdmins
stateStorageBackendBackupFactories.asJava.forEach(new Consumer[StateBackendFactory] {
override def accept(factory: StateBackendFactory): Unit = {
val taskMetricsRegistry =
@@ -578,7 +579,7 @@
taskInstanceMetrics.get(taskName).isDefined) taskInstanceMetrics.get(taskName).get.registry
else new MetricsRegistryMap
val taskBackupManager = factory.getBackupManager(jobContext, containerModel,
- taskModel, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance(),
+ taskModel, systemAdminsMap, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance,
loggedStorageBaseDir, nonLoggedStorageBaseDir)
taskBackupManagerMap.put(factory.getClass.getName, taskBackupManager)
}