Fix perferomance bug with async commit
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
index 91f3df3..44b40b4 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
@@ -20,6 +20,7 @@
 package org.apache.samza.storage;
 
 import java.io.File;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.config.Config;
@@ -29,6 +30,7 @@
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.Clock;
 
 
@@ -40,6 +42,7 @@
   TaskBackupManager getBackupManager(JobContext jobContext,
       ContainerModel containerModel,
       TaskModel taskModel,
+      Map<String, SystemAdmin> systemNameSystemAdminMap,
       ExecutorService backupExecutor,
       MetricsRegistry taskInstanceMetricsRegistry,
       Config config,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index 0772449..9f31171 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -40,6 +40,7 @@
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SSPMetadataCache;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
@@ -66,13 +67,14 @@
   public TaskBackupManager getBackupManager(JobContext jobContext,
       ContainerModel containerModel,
       TaskModel taskModel,
+      Map<String, SystemAdmin> systemNameSystemAdminsMap,
       ExecutorService backupExecutor,
       MetricsRegistry metricsRegistry,
       Config config,
       Clock clock,
       File loggedStoreBaseDir,
       File nonLoggedStoreBaseDir) {
-    SystemAdmins systemAdmins = new SystemAdmins(config);
+    SystemAdmins systemAdmins = new SystemAdmins(systemNameSystemAdminsMap);
     StorageConfig storageConfig = new StorageConfig(config);
     Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
 
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
index e2512b4..dc60598 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
@@ -21,6 +21,7 @@
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.lang3.StringUtils;
@@ -41,6 +42,7 @@
 import org.apache.samza.storage.TaskRestoreManager;
 import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
 import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.ReflectionUtil;
 
@@ -51,6 +53,7 @@
       JobContext jobContext,
       ContainerModel containerModel,
       TaskModel taskModel,
+      Map<String, SystemAdmin> systemNameSystemAdminsMap,
       ExecutorService backupExecutor,
       MetricsRegistry metricsRegistry,
       Config config,
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f0aa20b..ac5c198 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -571,6 +571,7 @@
       info ("Got task side input SSPs: %s" format taskSideInputSSPs)
 
       val taskBackupManagerMap = new util.HashMap[String, TaskBackupManager]()
+      val systemAdminsMap = systemAdmins.getSystemAdmins
       stateStorageBackendBackupFactories.asJava.forEach(new Consumer[StateBackendFactory] {
         override def accept(factory: StateBackendFactory): Unit = {
           val taskMetricsRegistry =
@@ -578,7 +579,7 @@
               taskInstanceMetrics.get(taskName).isDefined) taskInstanceMetrics.get(taskName).get.registry
             else new MetricsRegistryMap
           val taskBackupManager = factory.getBackupManager(jobContext, containerModel,
-            taskModel, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance(),
+            taskModel, systemAdminsMap, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance,
             loggedStorageBaseDir, nonLoggedStorageBaseDir)
           taskBackupManagerMap.put(factory.getClass.getName, taskBackupManager)
         }