[FLINK-24281][connectors/kafka] Migrate all format tests from FlinkKafkaProducer to KafkaSink
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 70d5060..501d470 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -21,12 +21,15 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableResult;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Before;
import org.junit.Test;
@@ -36,8 +39,6 @@
import java.util.List;
import java.util.Properties;
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE;
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults;
@@ -65,23 +66,8 @@
// ---------- Write the Debezium json into Kafka -------------------
List<String> lines = readLines("debezium-data-schema-exclude.txt");
- DataStreamSource<String> stream = env.fromCollection(lines);
- SerializationSchema<String> serSchema = new SimpleStringSchema();
- FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
- // the producer must not produce duplicates
- Properties producerProperties = getStandardProps();
- producerProperties.setProperty("retries", "0");
try {
- stream.addSink(
- new FlinkKafkaProducer<>(
- topic,
- serSchema,
- producerProperties,
- partitioner,
- EXACTLY_ONCE,
- DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
- env.execute("Write sequence");
+ writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
@@ -208,23 +194,8 @@
// ---------- Write the Canal json into Kafka -------------------
List<String> lines = readLines("canal-data.txt");
- DataStreamSource<String> stream = env.fromCollection(lines);
- SerializationSchema<String> serSchema = new SimpleStringSchema();
- FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
- // the producer must not produce duplicates
- Properties producerProperties = getStandardProps();
- producerProperties.setProperty("retries", "0");
try {
- stream.addSink(
- new FlinkKafkaProducer<>(
- topic,
- serSchema,
- producerProperties,
- partitioner,
- EXACTLY_ONCE,
- DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
- env.execute("Write sequence");
+ writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
@@ -361,23 +332,8 @@
// ---------- Write the Maxwell json into Kafka -------------------
List<String> lines = readLines("maxwell-data.txt");
- DataStreamSource<String> stream = env.fromCollection(lines);
- SerializationSchema<String> serSchema = new SimpleStringSchema();
- FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
- // the producer must not produce duplicates
- Properties producerProperties = getStandardProps();
- producerProperties.setProperty("retries", "0");
try {
- stream.addSink(
- new FlinkKafkaProducer<>(
- topic,
- serSchema,
- producerProperties,
- partitioner,
- EXACTLY_ONCE,
- DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
- env.execute("Write sequence");
+ writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
@@ -492,4 +448,28 @@
tableResult.getJobClient().get().cancel().get(); // stop the job
deleteTestTopic(topic);
}
+
+ private void writeRecordsToKafka(String topic, List<String> lines) throws Exception {
+ DataStreamSource<String> stream = env.fromCollection(lines);
+ SerializationSchema<String> serSchema = new SimpleStringSchema();
+ FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
+
+ // the producer must not produce duplicates
+ Properties producerProperties = getStandardProps();
+ producerProperties.setProperty("retries", "0");
+ stream.sinkTo(
+ KafkaSink.<String>builder()
+ .setBootstrapServers(
+ producerProperties.getProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(topic)
+ .setValueSerializationSchema(serSchema)
+ .setPartitioner(partitioner)
+ .build())
+ .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+ .build());
+ env.execute("Write sequence");
+ }
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index a562ec7..28c15a8 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -76,6 +76,9 @@
}.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
+ .withEnv(
+ "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+ String.valueOf(Duration.ofHours(2).toMillis()))
// Disable log deletion to prevent records from being deleted during test run
.withEnv("KAFKA_LOG_RETENTION_MS", "-1");
diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
index 840aa9e..e3f4149 100644
--- a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -20,16 +20,18 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import example.avro.User;
import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
@@ -74,22 +76,36 @@
SingleOutputStreamOperator<String> mapToString =
input.map((MapFunction<User, String>) SpecificRecordBase::toString);
- FlinkKafkaProducer<String> stringFlinkKafkaProducer =
- new FlinkKafkaProducer<>(
- parameterTool.getRequired("output-string-topic"),
- new SimpleStringSchema(),
- config);
- mapToString.addSink(stringFlinkKafkaProducer);
+ KafkaSink<String> stringSink =
+ KafkaSink.<String>builder()
+ .setBootstrapServers(
+ config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setTopic(parameterTool.getRequired("output-string-topic"))
+ .build())
+ .setKafkaProducerConfig(config)
+ .build();
+ mapToString.sinkTo(stringSink);
- FlinkKafkaProducer<User> avroFlinkKafkaProducer =
- new FlinkKafkaProducer<>(
- parameterTool.getRequired("output-avro-topic"),
- ConfluentRegistryAvroSerializationSchema.forSpecific(
- User.class,
- parameterTool.getRequired("output-subject"),
- schemaRegistryUrl),
- config);
- input.addSink(avroFlinkKafkaProducer);
+ KafkaSink<User> avroSink =
+ KafkaSink.<User>builder()
+ .setBootstrapServers(
+ config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setValueSerializationSchema(
+ ConfluentRegistryAvroSerializationSchema
+ .forSpecific(
+ User.class,
+ parameterTool.getRequired(
+ "output-subject"),
+ schemaRegistryUrl))
+ .setTopic(parameterTool.getRequired("output-avro-topic"))
+ .build())
+ .build();
+ input.sinkTo(avroSink);
env.execute("Kafka Confluent Schema Registry AVRO Example");
}