Merge pull request #1430 from MabelYC/addLabel

SAMZA-2574 : improve flexibility of SystemFactory interface
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 1cd345e..3ea7aa9 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -43,6 +43,14 @@
   default void stop() {};
 
   /**
+   * To give the status of current systemAdmin
+   * @return boolean stopped or not;
+   */
+  default boolean isStopped() {
+    return false;
+  }
+
+  /**
    * Fetches the offsets for the messages immediately after the supplied offsets
    * for a group of SystemStreamPartitions.
    *
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 6295ed6..63ee3c7 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -194,7 +194,7 @@
     this.hasDurableStores = new StorageConfig(config).hasDurableStores();
     this.state = new SamzaApplicationState(jobModelManager);
     // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
-    this.systemAdmins = new SystemAdmins(config);
+    this.systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
     this.partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
 
     Set<SystemStream> inputSystemStreams = JobModelUtil.getSystemStreams(jobModelManager.jobModel());
diff --git a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
index a2da35d..9972f9d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
@@ -20,6 +20,7 @@
 package org.apache.samza.config;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -53,6 +54,8 @@
   static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
   static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
 
+  private Map<String, SystemAdmin> systemAdminMap = new HashMap<>();
+
   public SystemConfig(Config config) {
     super(config);
   }
@@ -88,12 +91,19 @@
    *
    * @return map of system name to {@link SystemAdmin}
    */
+  public Map<String, SystemAdmin> getSystemAdmins(String adminLabel) {
+    if (systemAdminMap.isEmpty()) {
+      systemAdminMap = getSystemFactories().entrySet()
+          .stream()
+          .collect(Collectors.toMap(Entry::getKey,
+            systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
+               .getAdmin(systemNameToFactoryEntry.getKey(), this, adminLabel)));
+    }
+    return systemAdminMap;
+  }
+
   public Map<String, SystemAdmin> getSystemAdmins() {
-    return getSystemFactories().entrySet()
-        .stream()
-        .collect(Collectors.toMap(Entry::getKey,
-          systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
-              .getAdmin(systemNameToFactoryEntry.getKey(), this)));
+    return getSystemAdmins("");
   }
 
   /**
@@ -102,8 +112,15 @@
    * @param systemName System name
    * @return SystemAdmin of the system if it exists, otherwise null.
    */
+  public SystemAdmin getSystemAdmin(String systemName, String adminLabel) {
+    if (systemAdminMap.containsKey(systemName) && systemAdminMap.get(systemName).isStopped()) {
+      systemAdminMap.clear();
+    }
+    return getSystemAdmins(adminLabel).get(systemName);
+  }
+
   public SystemAdmin getSystemAdmin(String systemName) {
-    return getSystemAdmins().get(systemName);
+    return getSystemAdmin(systemName, "");
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index f75a8dc..a1d417a 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -48,7 +48,7 @@
   private final SystemAdmins systemAdmins;
 
   public StreamManager(Config config) {
-    this(new SystemAdmins(config));
+    this(new SystemAdmins(config, StreamManager.class.getSimpleName()));
   }
 
   @VisibleForTesting
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 37b2c0b..7687eb2 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -338,7 +338,7 @@
       return false;
     }
     SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
-    SystemAdmins systemAdmins = new SystemAdmins(config);
+    SystemAdmins systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
     systemAdmins.start();
     try {
       SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 9fb9975..cc51732 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -120,7 +120,7 @@
 
   @Override
   public JobModel getJobModel() {
-    SystemAdmins systemAdmins = new SystemAdmins(config);
+    SystemAdmins systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
     systemAdmins.start();
     try {
diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
index be15869..2ca81ca 100644
--- a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
+++ b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
@@ -38,6 +38,11 @@
     this.systemAdminMap = systemConfig.getSystemAdmins();
   }
 
+  public SystemAdmins(Config config, String adminLabel) {
+    SystemConfig systemConfig = new SystemConfig(config);
+    this.systemAdminMap = systemConfig.getSystemAdmins(adminLabel);
+  }
+
   /**
    * Creates a new instance of {@link SystemAdmins} with an empty admin mapping.
    * @return New empty instance of {@link SystemAdmins}
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 0f7ff76..6224a3e 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -166,7 +166,7 @@
     }
     // if diagnostics is enabled, create diagnostics stream if it doesnt exist
 
-    SystemAdmins systemAdmins = new SystemAdmins(config);
+    SystemAdmins systemAdmins = new SystemAdmins(config, DiagnosticsUtil.class.getSimpleName());
     String diagnosticsSystemStreamName = new MetricsConfig(config)
         .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
         .orElseThrow(() -> new ConfigException("Missing required config: " +
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index e03ce8b..1489f67 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -146,7 +146,7 @@
       stop();
     });
     this.barrier =  new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
-    systemAdmins = new SystemAdmins(config);
+    systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
     streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
     LocationIdProviderFactory locationIdProviderFactory =
         ReflectionUtil.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class);
diff --git a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index cae59d6..93536fa 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -99,7 +99,7 @@
       .getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
 
     var systemStreams = Seq.empty[String]
-    val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName)
+    val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName, this.getClass.getSimpleName)
     try {
       systemAdmin.start()
       systemStreams =
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index a5ed3dc..28c3135 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -215,7 +215,7 @@
     }).toMap
     info("Got system factories: %s" format systemFactories.keys)
 
-    val systemAdmins = new SystemAdmins(config)
+    val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
     info("Got system admins: %s" format systemAdmins.getSystemNames)
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5a3f7b3..7c0e747 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -87,7 +87,7 @@
     val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE))
     val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE))
 
-    val systemAdmins = new SystemAdmins(config)
+    val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
     try {
       systemAdmins.start()
       val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 37d1393..254f86d 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -54,7 +54,7 @@
    * @param config to create coordinator stream.
    */
   def createCoordinatorStream(config: Config): Unit = {
-    val systemAdmins = new SystemAdmins(config)
+    val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
 
     info("Creating coordinator stream")
     val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index a60752c..5e95df5 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -201,6 +201,11 @@
     }
   }
 
+  @Override
+  public boolean isStopped() {
+    return stopped.get();
+  }
+
   /**
    * Note! This method does not populate SystemStreamMetadata for each stream with real data.
    * Thus, this method should ONLY be used to get number of partitions for each stream.