| commit | 95ba4e67d856ba394cb4761e8092ce45a555cfa1 | [log] [tgz] |
|---|---|---|
| author | Fabian Paul <fabianpaul@ververica.com> | Wed Dec 08 15:39:45 2021 +0100 |
| committer | Chesnay Schepler <chesnay@apache.org> | Thu Jan 26 16:40:41 2023 +0100 |
| tree | 1b2f99cb044c5d70f0a9c8e506fb52dee300f1f5 | |
| parent | 231a84c9f37ce5bea6fa6439e6431c3a8a15ba51 [diff] |
[hotfix][tests] Close Kafka AdminClients to prevent resource leaks
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 8297bdd..26b16e7 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -350,10 +350,11 @@ @Override public int getLeaderToShutDown(String topic) throws Exception { - AdminClient client = AdminClient.create(getStandardProperties()); - TopicDescription result = - client.describeTopics(Collections.singleton(topic)).all().get().get(topic); - return result.partitions().get(0).leader().id(); + try (final AdminClient client = AdminClient.create(getStandardProperties())) { + TopicDescription result = + client.describeTopics(Collections.singleton(topic)).all().get().get(topic); + return result.partitions().get(0).leader().id(); + } } @Override
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java index 28c15a8..560d8d8 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -170,8 +170,7 @@ } private Map<String, TopicDescription> describeExternalTopics() { - final AdminClient adminClient = AdminClient.create(getStandardProps()); - try { + try (final AdminClient adminClient = AdminClient.create(getStandardProps())) { final List<String> topics = adminClient.listTopics().listings().get().stream() .filter(listing -> !listing.isInternal())