clean up code
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 9183e13..b72d231 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -111,10 +111,6 @@
       systemConsumer.start();
       systemProducer.register(SOURCE);
       systemProducer.start();
-      Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-        LOG.info("CoordinatorStreamStore Shut Down Hook thread is closing kafka clients");
-        this.close();
-      }));
       iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
       readMessagesFromCoordinatorStream();
     } else {
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index d0e705f..b1e4206 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -46,7 +46,6 @@
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 6145fd5..b1bcc4d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -89,9 +89,6 @@
     Preconditions.checkNotNull(systemAdmin)
 
     systemAdmin.start()
-    Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerAdminShutdownHook") {
-      override def run = systemAdmin.stop()
-    })
 
     info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
       s"partition count: ${checkpointSpec.getPartitionCount}")
@@ -117,12 +114,6 @@
     info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
     systemConsumer.register(checkpointSsp, oldestOffset)
     systemConsumer.start()
-    Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerClientShutdownHook") {
-      override def run(): Unit = {
-        producerRef.get().stop()
-        systemConsumer.stop()
-      }
-    })
   }
 
   /**