[FLINK-24765][kafka] Bump Kafka version to 2.8
By bumping the kafka version to the latest 2.x release we include the
latest patches of Kafka which should harden our tests.
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
index 7520b94..12fa08f 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -71,7 +71,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>2.4.1</version>
+ <version>2.8.1</version>
</dependency>
<!-- The following dependencies are for connector/format sql-jars that
diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index 12568c7..49b6cbb 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -36,7 +36,7 @@
<packaging>jar</packaging>
<properties>
- <kafka.version>2.4.1</kafka.version>
+ <kafka.version>2.8.1</kafka.version>
</properties>
<dependencies>
@@ -111,6 +111,15 @@
<scope>test</scope>
</dependency>
+ <!-- Required to execute the kafka server for testing. Please change the zookeeper version accordingly when changing the Kafka version
+ https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/gradle/dependencies.gradle#L122 -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.5.9</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index aec1edf..65616d2 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -33,7 +33,6 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkState;
@@ -123,12 +122,6 @@
super.close(timeout);
}
- @Override
- public void close(long timeout, TimeUnit unit) {
- closed = true;
- super.close(timeout, unit);
- }
-
public boolean isClosed() {
return closed;
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 5970158..ca81cc8 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -86,7 +86,6 @@
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -1059,7 +1058,7 @@
e);
} finally {
if (producer != null) {
- producer.close(0, TimeUnit.SECONDS);
+ producer.close(Duration.ofSeconds(0));
}
}
}
@@ -1082,7 +1081,7 @@
producer.initTransactions();
} finally {
if (producer != null) {
- producer.close(0, TimeUnit.SECONDS);
+ producer.close(Duration.ofSeconds(0));
}
}
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
index c103bc3..a424a81 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
@@ -24,6 +24,7 @@
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -54,7 +55,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
/** Internal flink kafka producer. */
@PublicEvolving
@@ -126,6 +126,13 @@
}
@Override
+ public void sendOffsetsToTransaction(
+ Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata)
+ throws ProducerFencedException {
+ kafkaProducer.sendOffsetsToTransaction(map, consumerGroupMetadata);
+ }
+
+ @Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return kafkaProducer.send(record);
}
@@ -155,20 +162,6 @@
}
@Override
- public void close(long timeout, TimeUnit unit) {
- synchronized (producerClosingLock) {
- kafkaProducer.close(timeout, unit);
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Closed internal KafkaProducer {}. Stacktrace: {}",
- System.identityHashCode(this),
- Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
- }
- closed = true;
- }
- }
-
- @Override
public void close(Duration duration) {
synchronized (producerClosingLock) {
kafkaProducer.close(duration);
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java
index b406bed..cb04ed2 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java
@@ -34,7 +34,7 @@
@Override
public Double getValue() {
- return kafkaMetric.value();
+ return (Double) kafkaMetric.metricValue();
}
public void setKafkaMetric(Metric kafkaMetric) {
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
index 51e7d13..23617b5 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -32,6 +32,6 @@
@Override
public Double getValue() {
- return kafkaMetric.value();
+ return (Double) kafkaMetric.metricValue();
}
}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 6968bea..6097cbf 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -310,7 +310,8 @@
assertNotNull(clientId);
assertTrue(clientId.startsWith(clientIdPrefix));
assertEquals(
- defaultTimeoutMs, Whitebox.getInternalState(adminClient, "defaultTimeoutMs"));
+ defaultTimeoutMs,
+ Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"));
KafkaConsumer<?, ?> consumer =
(KafkaConsumer<?, ?>) Whitebox.getInternalState(enumerator, "consumer");
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index e635ecb..cdcbe3b 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -122,6 +122,8 @@
import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning;
import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -205,9 +207,9 @@
final TimeoutException timeoutException = optionalTimeoutException.get();
if (useNewSource) {
- assertEquals(
- "Timed out waiting for a node assignment.",
- timeoutException.getMessage());
+ assertThat(
+ timeoutException.getCause().getMessage(),
+ containsString("Timed out waiting for a node assignment."));
} else {
assertEquals(
"Timeout expired while fetching topic metadata",
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 95e3241..4a164e5 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -31,7 +31,6 @@
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
-import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.collections.list.UnmodifiableList;
@@ -67,8 +66,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import scala.collection.mutable.ArraySeq;
-
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -452,12 +449,7 @@
try {
scala.Option<String> stringNone = scala.Option.apply(null);
- KafkaServer server =
- new KafkaServer(
- kafkaConfig,
- Time.SYSTEM,
- stringNone,
- new ArraySeq<KafkaMetricsReporter>(0));
+ KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, false);
server.startup();
return server;
} catch (KafkaException e) {
diff --git a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
index 452683a..553ccdf 100644
--- a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.apache.kafka:kafka-clients:2.4.1
+- org.apache.kafka:kafka-clients:2.8.1