address comments
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index b750688..9183e13 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -113,9 +113,7 @@
systemProducer.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("CoordinatorStreamStore Shut Down Hook thread is closing kafka clients");
- this.systemProducer.stop();
- this.systemConsumer.stop();
- this.systemAdmin.stop();
+ this.close();
}));
iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
readMessagesFromCoordinatorStream();
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 05fd8bc..0218caa 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -88,7 +88,7 @@
Preconditions.checkNotNull(systemAdmin)
systemAdmin.start()
- Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerShutdownHook") {
+ Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerAdminShutdownHook") {
override def run = systemAdmin.stop()
})
@@ -115,7 +115,7 @@
info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
systemConsumer.register(checkpointSsp, oldestOffset)
systemConsumer.start()
- Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerShutdownHook") {
+ Runtime.getRuntime.addShutdownHook(new Thread("KafkaCheckPointManagerClientShutdownHook") {
override def run(): Unit = {
producerRef.get().stop()
systemConsumer.stop()