[FLINK-24765][kafka] Bump Kafka version to 2.8

By bumping the kafka version to the latest 2.x release we include the
latest patches of Kafka which should harden our tests.
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
index 7520b94..12fa08f 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -71,7 +71,7 @@
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
-			<version>2.4.1</version>
+			<version>2.8.1</version>
 		</dependency>
 
 		<!-- The following dependencies are for connector/format sql-jars that
diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index 12568c7..49b6cbb 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -36,7 +36,7 @@
 	<packaging>jar</packaging>
 
 	<properties>
-		<kafka.version>2.4.1</kafka.version>
+		<kafka.version>2.8.1</kafka.version>
 	</properties>
 
 	<dependencies>
@@ -111,6 +111,15 @@
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Required to execute the kafka server for testing. Please change the zookeeper version accordingly when changing the Kafka version
+             https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/gradle/dependencies.gradle#L122 -->
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+			<version>3.5.9</version>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.testcontainers</groupId>
 			<artifactId>kafka</artifactId>
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index aec1edf..65616d2 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -33,7 +33,6 @@
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -123,12 +122,6 @@
         super.close(timeout);
     }
 
-    @Override
-    public void close(long timeout, TimeUnit unit) {
-        closed = true;
-        super.close(timeout, unit);
-    }
-
     public boolean isClosed() {
         return closed;
     }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 5970158..ca81cc8 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -86,7 +86,6 @@
 import java.util.Set;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -1059,7 +1058,7 @@
                         e);
             } finally {
                 if (producer != null) {
-                    producer.close(0, TimeUnit.SECONDS);
+                    producer.close(Duration.ofSeconds(0));
                 }
             }
         }
@@ -1082,7 +1081,7 @@
                 producer.initTransactions();
             } finally {
                 if (producer != null) {
-                    producer.close(0, TimeUnit.SECONDS);
+                    producer.close(Duration.ofSeconds(0));
                 }
             }
         }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
index c103bc3..a424a81 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
@@ -24,6 +24,7 @@
 
 import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
 
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -54,7 +55,6 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 /** Internal flink kafka producer. */
 @PublicEvolving
@@ -126,6 +126,13 @@
     }
 
     @Override
+    public void sendOffsetsToTransaction(
+            Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata)
+            throws ProducerFencedException {
+        kafkaProducer.sendOffsetsToTransaction(map, consumerGroupMetadata);
+    }
+
+    @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
         return kafkaProducer.send(record);
     }
@@ -155,20 +162,6 @@
     }
 
     @Override
-    public void close(long timeout, TimeUnit unit) {
-        synchronized (producerClosingLock) {
-            kafkaProducer.close(timeout, unit);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Closed internal KafkaProducer {}. Stacktrace: {}",
-                        System.identityHashCode(this),
-                        Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
-            }
-            closed = true;
-        }
-    }
-
-    @Override
     public void close(Duration duration) {
         synchronized (producerClosingLock) {
             kafkaProducer.close(duration);
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java
index b406bed..cb04ed2 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java
@@ -34,7 +34,7 @@
 
     @Override
     public Double getValue() {
-        return kafkaMetric.value();
+        return (Double) kafkaMetric.metricValue();
     }
 
     public void setKafkaMetric(Metric kafkaMetric) {
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
index 51e7d13..23617b5 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -32,6 +32,6 @@
 
     @Override
     public Double getValue() {
-        return kafkaMetric.value();
+        return (Double) kafkaMetric.metricValue();
     }
 }
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 6968bea..6097cbf 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -310,7 +310,8 @@
             assertNotNull(clientId);
             assertTrue(clientId.startsWith(clientIdPrefix));
             assertEquals(
-                    defaultTimeoutMs, Whitebox.getInternalState(adminClient, "defaultTimeoutMs"));
+                    defaultTimeoutMs,
+                    Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"));
 
             KafkaConsumer<?, ?> consumer =
                     (KafkaConsumer<?, ?>) Whitebox.getInternalState(enumerator, "consumer");
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index e635ecb..cdcbe3b 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -122,6 +122,8 @@
 import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
 import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -205,9 +207,9 @@
 
                 final TimeoutException timeoutException = optionalTimeoutException.get();
                 if (useNewSource) {
-                    assertEquals(
-                            "Timed out waiting for a node assignment.",
-                            timeoutException.getMessage());
+                    assertThat(
+                            timeoutException.getCause().getMessage(),
+                            containsString("Timed out waiting for a node assignment."));
                 } else {
                     assertEquals(
                             "Timeout expired while fetching topic metadata",
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 95e3241..4a164e5 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -31,7 +31,6 @@
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 
-import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.apache.commons.collections.list.UnmodifiableList;
@@ -67,8 +66,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import scala.collection.mutable.ArraySeq;
-
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -452,12 +449,7 @@
 
             try {
                 scala.Option<String> stringNone = scala.Option.apply(null);
-                KafkaServer server =
-                        new KafkaServer(
-                                kafkaConfig,
-                                Time.SYSTEM,
-                                stringNone,
-                                new ArraySeq<KafkaMetricsReporter>(0));
+                KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, false);
                 server.startup();
                 return server;
             } catch (KafkaException e) {
diff --git a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
index 452683a..553ccdf 100644
--- a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@
 
 This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.kafka:kafka-clients:2.4.1
+- org.apache.kafka:kafka-clients:2.8.1