[FLINK-24282][connectors/kafka] Make topic selector for KafkaSink serializable
It is possible to calculate the target topic per record. Therefore users
can provide a lambda when constructing the KafkaSink. Before this
commit the lambda was not marked as serializable and could not be
transferred to the workers.
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 30ff2f8..e2d0d29 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -20,16 +20,14 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
-import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
-
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.serialization.Serializer;
import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
import java.util.OptionalInt;
import java.util.function.Function;
@@ -119,7 +117,7 @@
* @return {@code this}
*/
public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setTopicSelector(
- Function<? super T, String> topicSelector) {
+ TopicSelector<? super T> topicSelector) {
checkState(this.topicSelector == null, "Topic selector already set.");
KafkaRecordSerializationSchemaBuilder<T> self = self();
self.topicSelector = new CachingTopicSelector<>(checkNotNull(topicSelector));
@@ -252,34 +250,25 @@
checkState(keySerializationSchema == null, "Key serializer already set.");
}
- private static class CachingTopicSelector<IN> implements Function<IN, String> {
+ private static class CachingTopicSelector<IN> implements Function<IN, String>, Serializable {
- private final LoadingCache<IN, String> cache;
+ private static final int CACHE_RESET_SIZE = 5;
+ private final Map<IN, String> cache;
+ private final TopicSelector<IN> topicSelector;
- CachingTopicSelector(Function<IN, String> topicSelector) {
- this.cache =
- CacheBuilder.newBuilder()
- .maximumSize(5)
- .build(new TopicSelectorCacheLoader<>(topicSelector));
+ CachingTopicSelector(TopicSelector<IN> topicSelector) {
+ this.topicSelector = topicSelector;
+ this.cache = new HashMap<>();
}
@Override
public String apply(IN in) {
- return cache.getUnchecked(in);
- }
- }
-
- private static class TopicSelectorCacheLoader<IN> extends CacheLoader<IN, String> {
-
- private final Function<IN, String> topicSelector;
-
- TopicSelectorCacheLoader(Function<IN, String> topicSelector) {
- this.topicSelector = topicSelector;
- }
-
- @Override
- public String load(IN in) throws Exception {
- return topicSelector.apply(in);
+ final String topic = cache.getOrDefault(in, topicSelector.apply(in));
+ cache.put(in, topic);
+ if (cache.size() == CACHE_RESET_SIZE) {
+ cache.clear();
+ }
+ return topic;
}
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java
new file mode 100644
index 0000000..2a20754
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Selects a topic for the incoming record.
+ *
+ * @param <IN> type of the incoming record
+ */
+@PublicEvolving
+public interface TopicSelector<IN> extends Function<IN, String>, Serializable {}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
index 3eb6450..80c92d3 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
@@ -104,7 +104,7 @@
@Test
public void testSerializeRecordWithTopicSelector() {
- final Function<String, String> topicSelector =
+ final TopicSelector<String> topicSelector =
(e) -> {
if (e.equals("a")) {
return "topic-a";
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index fe4ae3c..6fc7511 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -19,6 +19,7 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -60,7 +61,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.After;
import org.junit.AfterClass;
@@ -117,6 +117,7 @@
private static final int ZK_TIMEOUT_MILLIS = 30000;
private static final short TOPIC_REPLICATION_FACTOR = 1;
private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1);
+ private static final RecordSerializer serializer = new RecordSerializer();
private static AdminClient admin;
private String topic;
@@ -307,7 +308,11 @@
new KafkaSinkBuilder<Long>()
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
- .setRecordSerializer(new RecordSerializer(topic));
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(topic)
+ .setValueSerializationSchema(new RecordSerializer())
+ .build());
if (transactionalIdPrefix == null) {
transactionalIdPrefix = "kafka-sink";
}
@@ -335,7 +340,11 @@
new KafkaSinkBuilder<Long>()
.setDeliverGuarantee(guarantee)
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
- .setRecordSerializer(new RecordSerializer(topic))
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(topic)
+ .setValueSerializationSchema(new RecordSerializer())
+ .build())
.setTransactionalIdPrefix("kafka-sink")
.build());
env.execute();
@@ -361,7 +370,11 @@
new KafkaSinkBuilder<Long>()
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setDeliverGuarantee(deliveryGuarantee)
- .setRecordSerializer(new RecordSerializer(topic))
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(topic)
+ .setValueSerializationSchema(new RecordSerializer())
+ .build())
.setTransactionalIdPrefix("kafka-sink")
.build());
env.execute();
@@ -447,20 +460,13 @@
return collectedRecords;
}
- private static class RecordSerializer implements KafkaRecordSerializationSchema<Long> {
-
- private final String topic;
-
- public RecordSerializer(String topic) {
- this.topic = topic;
- }
+ private static class RecordSerializer implements SerializationSchema<Long> {
@Override
- public ProducerRecord<byte[], byte[]> serialize(
- Long element, KafkaSinkContext context, Long timestamp) {
+ public byte[] serialize(Long element) {
final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(element);
- return new ProducerRecord<>(topic, 0, null, null, buffer.array());
+ return buffer.array();
}
}