address comments
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index b750688..9183e13 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -113,9 +113,7 @@
       systemProducer.start();
       Runtime.getRuntime().addShutdownHook(new Thread(() -> {
         LOG.info("CoordinatorStreamStore Shut Down Hook thread is closing kafka clients");
-        this.systemProducer.stop();
-        this.systemConsumer.stop();
-        this.systemAdmin.stop();
+        this.close();
       }));
       iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
       readMessagesFromCoordinatorStream();
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 05fd8bc..0218caa 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -88,7 +88,7 @@
     Preconditions.checkNotNull(systemAdmin)
 
     systemAdmin.start()
-    Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerShutdownHook") {
+    Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerAdminShutdownHook") {
       override def run = systemAdmin.stop()
     })
 
@@ -115,7 +115,7 @@
     info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
     systemConsumer.register(checkpointSsp, oldestOffset)
     systemConsumer.start()
-    Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerShutdownHook") {
+    Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerClientShutdownHook") {
       override def run(): Unit = {
         producerRef.get().stop()
         systemConsumer.stop()