[hotfix][tests] Close Kafka AdminClients to prevent resource leaks
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 8297bdd..26b16e7 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -350,10 +350,11 @@
 
     @Override
     public int getLeaderToShutDown(String topic) throws Exception {
-        AdminClient client = AdminClient.create(getStandardProperties());
-        TopicDescription result =
-                client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
-        return result.partitions().get(0).leader().id();
+        try (final AdminClient client = AdminClient.create(getStandardProperties())) {
+            TopicDescription result =
+                    client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
+            return result.partitions().get(0).leader().id();
+        }
     }
 
     @Override
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 28c15a8..560d8d8 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -170,8 +170,7 @@
     }
 
     private Map<String, TopicDescription> describeExternalTopics() {
-        final AdminClient adminClient = AdminClient.create(getStandardProps());
-        try {
+        try (final AdminClient adminClient = AdminClient.create(getStandardProps())) {
             final List<String> topics =
                     adminClient.listTopics().listings().get().stream()
                             .filter(listing -> !listing.isInternal())