[FLINK-24282][connectors/kafka] Make topic selector for KafkaSink serializable

It is possible to calculate the target topic per record. Therefore users
can provide a lambda when constructing the KafkaSink. Before this
commit the lambda was not marked as serializable and could not be
transferred to the workers.
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 30ff2f8..e2d0d29 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -20,16 +20,14 @@
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
-import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
-
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.serialization.Serializer;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.function.Function;
@@ -119,7 +117,7 @@
      * @return {@code this}
      */
     public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setTopicSelector(
-            Function<? super T, String> topicSelector) {
+            TopicSelector<? super T> topicSelector) {
         checkState(this.topicSelector == null, "Topic selector already set.");
         KafkaRecordSerializationSchemaBuilder<T> self = self();
         self.topicSelector = new CachingTopicSelector<>(checkNotNull(topicSelector));
@@ -252,34 +250,25 @@
         checkState(keySerializationSchema == null, "Key serializer already set.");
     }
 
-    private static class CachingTopicSelector<IN> implements Function<IN, String> {
+    private static class CachingTopicSelector<IN> implements Function<IN, String>, Serializable {
 
-        private final LoadingCache<IN, String> cache;
+        private static final int CACHE_RESET_SIZE = 5;
+        private final Map<IN, String> cache;
+        private final TopicSelector<IN> topicSelector;
 
-        CachingTopicSelector(Function<IN, String> topicSelector) {
-            this.cache =
-                    CacheBuilder.newBuilder()
-                            .maximumSize(5)
-                            .build(new TopicSelectorCacheLoader<>(topicSelector));
+        CachingTopicSelector(TopicSelector<IN> topicSelector) {
+            this.topicSelector = topicSelector;
+            this.cache = new HashMap<>();
         }
 
         @Override
         public String apply(IN in) {
-            return cache.getUnchecked(in);
-        }
-    }
-
-    private static class TopicSelectorCacheLoader<IN> extends CacheLoader<IN, String> {
-
-        private final Function<IN, String> topicSelector;
-
-        TopicSelectorCacheLoader(Function<IN, String> topicSelector) {
-            this.topicSelector = topicSelector;
-        }
-
-        @Override
-        public String load(IN in) throws Exception {
-            return topicSelector.apply(in);
+            final String topic = cache.getOrDefault(in, topicSelector.apply(in));
+            cache.put(in, topic);
+            if (cache.size() == CACHE_RESET_SIZE) {
+                cache.clear();
+            }
+            return topic;
         }
     }
 
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java
new file mode 100644
index 0000000..2a20754
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Selects a topic for the incoming record.
+ *
+ * @param <IN> type of the incoming record
+ */
+@PublicEvolving
+public interface TopicSelector<IN> extends Function<IN, String>, Serializable {}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
index 3eb6450..80c92d3 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
@@ -104,7 +104,7 @@
 
     @Test
     public void testSerializeRecordWithTopicSelector() {
-        final Function<String, String> topicSelector =
+        final TopicSelector<String> topicSelector =
                 (e) -> {
                     if (e.equals("a")) {
                         return "topic-a";
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index fe4ae3c..6fc7511 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -60,7 +61,6 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -117,6 +117,7 @@
     private static final int ZK_TIMEOUT_MILLIS = 30000;
     private static final short TOPIC_REPLICATION_FACTOR = 1;
     private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1);
+    private static final RecordSerializer serializer = new RecordSerializer();
     private static AdminClient admin;
 
     private String topic;
@@ -307,7 +308,11 @@
                 new KafkaSinkBuilder<Long>()
                         .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                         .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
-                        .setRecordSerializer(new RecordSerializer(topic));
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic(topic)
+                                        .setValueSerializationSchema(new RecordSerializer())
+                                        .build());
         if (transactionalIdPrefix == null) {
             transactionalIdPrefix = "kafka-sink";
         }
@@ -335,7 +340,11 @@
                 new KafkaSinkBuilder<Long>()
                         .setDeliverGuarantee(guarantee)
                         .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
-                        .setRecordSerializer(new RecordSerializer(topic))
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic(topic)
+                                        .setValueSerializationSchema(new RecordSerializer())
+                                        .build())
                         .setTransactionalIdPrefix("kafka-sink")
                         .build());
         env.execute();
@@ -361,7 +370,11 @@
                 new KafkaSinkBuilder<Long>()
                         .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
                         .setDeliverGuarantee(deliveryGuarantee)
-                        .setRecordSerializer(new RecordSerializer(topic))
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic(topic)
+                                        .setValueSerializationSchema(new RecordSerializer())
+                                        .build())
                         .setTransactionalIdPrefix("kafka-sink")
                         .build());
         env.execute();
@@ -447,20 +460,13 @@
         return collectedRecords;
     }
 
-    private static class RecordSerializer implements KafkaRecordSerializationSchema<Long> {
-
-        private final String topic;
-
-        public RecordSerializer(String topic) {
-            this.topic = topic;
-        }
+    private static class RecordSerializer implements SerializationSchema<Long> {
 
         @Override
-        public ProducerRecord<byte[], byte[]> serialize(
-                Long element, KafkaSinkContext context, Long timestamp) {
+        public byte[] serialize(Long element) {
             final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
             buffer.putLong(element);
-            return new ProducerRecord<>(topic, 0, null, null, buffer.array());
+            return buffer.array();
         }
     }