clean up code
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 9183e13..b72d231 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -111,10 +111,6 @@
systemConsumer.start();
systemProducer.register(SOURCE);
systemProducer.start();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- LOG.info("CoordinatorStreamStore Shut Down Hook thread is closing kafka clients");
- this.close();
- }));
iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
readMessagesFromCoordinatorStream();
} else {
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index d0e705f..b1e4206 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -46,7 +46,6 @@
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 6145fd5..b1bcc4d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -89,9 +89,6 @@
Preconditions.checkNotNull(systemAdmin)
systemAdmin.start()
- Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerAdminShutdownHook") {
- override def run = systemAdmin.stop()
- })
info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
s"partition count: ${checkpointSpec.getPartitionCount}")
@@ -117,12 +114,6 @@
info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
systemConsumer.register(checkpointSsp, oldestOffset)
systemConsumer.start()
- Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerClientShutdownHook") {
- override def run(): Unit = {
- producerRef.get().stop()
- systemConsumer.stop()
- }
- })
}
/**