modifies to close the clients according to lifecycle
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 1050662..039f597 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -55,6 +55,7 @@
   public void createResources() {
     if (checkpointManager != null) {
       checkpointManager.createResources();
+      checkpointManager.stop();
     }
     createChangelogStreams();
   }
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7687eb2..cc24345 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -441,15 +441,24 @@
 
     @Override
     public void afterFailure(Throwable t) {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+      // we need to close associated coordinator metadata store, although the processor failed
+      processors.forEach(sp -> {
+        if (sp.getLeft().equals(processor)) {
+          if (sp.getRight() != null) {
+            sp.getRight().close();
+          }
+          processors.remove(sp);
+        }
+      });
       // the processor stopped with failure, this is logging the first processor's failure as the cause of
       // the whole application failure
       if (failure.compareAndSet(null, t)) {
         // shutdown the other processors
         processors.forEach(sp -> {
-          sp.getLeft().stop();    // Stop StreamProcessor
-          sp.getRight().close();  // Close associated coordinator metadata store
+          sp.getLeft().stop();     // Stop StreamProcessor
+          if (sp.getRight() != null) {
+            sp.getRight().close(); // Close associated coordinator metadata store
+          }
         });
       }
 
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6224a3e..d0e705f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -166,7 +166,6 @@
     }
     // if diagnostics is enabled, create diagnostics stream if it doesnt exist
 
-    SystemAdmins systemAdmins = new SystemAdmins(config, DiagnosticsUtil.class.getSimpleName());
     String diagnosticsSystemStreamName = new MetricsConfig(config)
         .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
         .orElseThrow(() -> new ConfigException("Missing required config: " +
@@ -174,7 +173,9 @@
                 MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)));
 
     SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName);
-    SystemAdmin diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem());
+    SystemConfig systemConfig = new SystemConfig(config);
+    SystemAdmin diagnosticsSysAdmin = systemConfig.getSystemFactories().get(diagnosticsSystemStream.getSystem())
+        .getAdmin(diagnosticsSystemStream.getSystem(), config, DiagnosticsUtil.class.getSimpleName());
     StreamSpec diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream(),
         diagnosticsSystemStream.getSystem(), new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID));
 
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 254f86d..b9469cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -54,11 +54,12 @@
    * @param config to create coordinator stream.
    */
   def createCoordinatorStream(config: Config): Unit = {
-    val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
-
     info("Creating coordinator stream")
     val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
-    val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+    val systemConfig = new SystemConfig(config)
+    val coordinatorSystemAdmin = systemConfig.getSystemFactories.get(coordinatorSystemStream.getSystem)
+      .getAdmin(coordinatorSystemStream.getSystem, config, classOf[DiagnosticsUtil].getSimpleName)
+
     coordinatorSystemAdmin.start()
     CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
     coordinatorSystemAdmin.stop()
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 0218caa..6145fd5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -83,6 +83,7 @@
 
   /**
     * Create checkpoint stream prior to start.
+    * Need to close KafkaCheckPointManager after createResources
     */
   override def createResources(): Unit = {
     Preconditions.checkNotNull(systemAdmin)
@@ -109,7 +110,8 @@
     // register and start a producer for the checkpoint topic
     info("Starting the checkpoint SystemProducer")
     producerRef.get().start()
-
+    info("Starting the checkpoint SystemAdmin")
+    systemAdmin.start()
     // register and start a consumer for the checkpoint topic
     val oldestOffset = getOldestOffset(checkpointSsp)
     info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")