Support Pulsar schema for pulsar kafka client wrapper (#4534)
Fixes https://github.com/apache/pulsar/issues/4228
Master Issue: https://github.com/apache/pulsar/issues/4228
### Motivation
Use Pulsar schema in pulsar kafka client.
### Modifications
Support schema of pulsar for pulsar kafka client
### Verifying this change
Add Unit test
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
new file mode 100644
index 0000000..3e39b8d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class ConsumerAvroExample {
+
+ public static void main(String[] args) {
+ String topic = "persistent://public/default/test-avro";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+
+ @SuppressWarnings("resource")
+ Consumer<Foo, Bar> consumer = new KafkaConsumer<>(props, fooSchema, barSchema);
+ consumer.subscribe(Arrays.asList(topic));
+
+ while (true) {
+ ConsumerRecords<Foo, Bar> records = consumer.poll(100);
+ records.forEach(record -> {
+ log.info("Received record: {}", record);
+ });
+
+ // Commit last offset
+ consumer.commitSync();
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
new file mode 100644
index 0000000..aa5e29a
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class ProducerAvroExample {
+ public static void main(String[] args) {
+ String topic = "persistent://public/default/test-avro";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+
+ props.put("key.serializer", IntegerSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+
+
+ Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
+ log.info("Message {} sent successfully", i);
+ }
+
+ producer.close();
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
new file mode 100644
index 0000000..8120900
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Bar {
+ private boolean field1;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
new file mode 100644
index 0000000..d584f51
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ private int field3;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index bbab13e..15e23a2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -41,7 +41,6 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
@@ -49,20 +48,22 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -74,8 +75,8 @@
private final PulsarClient client;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valueDeserializer;
+ private final Schema<K> keySchema;
+ private final Schema<V> valueSchema;
private final String groupId;
private final boolean isAutoCommit;
@@ -110,66 +111,75 @@
private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);
public PulsarKafkaConsumer(Map<String, Object> configs) {
- this(configs, null, null);
+ this(new ConsumerConfig(configs), null, null);
}
public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
- Deserializer<V> valueDeserializer) {
- this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
- keyDeserializer, valueDeserializer);
+ Deserializer<V> valueDeserializer) {
+ this(new ConsumerConfig(configs),
+ new PulsarKafkaSchema<K>(keyDeserializer), new PulsarKafkaSchema<V>(valueDeserializer));
+ }
+
+ public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ConsumerConfig(configs), keySchema, valueSchema);
}
public PulsarKafkaConsumer(Properties properties) {
- this(properties, null, null);
+ this(new ConsumerConfig(properties), null, null);
}
public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
- Deserializer<V> valueDeserializer) {
- this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
- keyDeserializer, valueDeserializer);
+ Deserializer<V> valueDeserializer) {
+ this(new ConsumerConfig(properties),
+ new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
+ }
+
+ public PulsarKafkaConsumer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ConsumerConfig(properties), keySchema, valueSchema);
}
@SuppressWarnings("unchecked")
- private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer,
- Deserializer<V> valueDeserializer) {
+ private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
- if (keyDeserializer == null) {
- this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- Deserializer.class);
- this.keyDeserializer.configure(config.originals(), true);
+ if (keySchema == null) {
+ Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
+ this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
} else {
- this.keyDeserializer = keyDeserializer;
- config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ this.keySchema = keySchema;
+ consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
- if (valueDeserializer == null) {
- this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- Deserializer.class);
- this.valueDeserializer.configure(config.originals(), true);
+ if (valueSchema == null) {
+ Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ kafkaValueDeserializer.configure(consumerConfig.originals(), true);
+ this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
} else {
- this.valueDeserializer = valueDeserializer;
- config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ this.valueSchema = valueSchema;
+ consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
- groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
- isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
- strategy = getStrategy(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
log.info("Offset reset strategy has been assigned value {}", strategy);
- String serviceUrl = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+ String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
// If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
- if(config.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
- maxRecordsInSinglePoll = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+ if(consumerConfig.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
+ maxRecordsInSinglePoll = consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
} else {
maxRecordsInSinglePoll = 1000;
}
- interceptors = (List) config.getConfiguredInstances(
+ interceptors = (List) consumerConfig.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
this.properties = new Properties();
- config.originals().forEach((k, v) -> properties.put(k, v));
+ consumerConfig.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
@@ -352,7 +362,10 @@
}
K key = getKey(topic, msg);
- V value = valueDeserializer.deserialize(topic, msg.getData());
+ if (valueSchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
+ }
+ V value = valueSchema.decode(msg.getData());
TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
long timestamp = msg.getPublishTime();
@@ -403,13 +416,18 @@
return null;
}
- if (keyDeserializer instanceof StringDeserializer) {
- return (K) msg.getKey();
- } else {
- // Assume base64 encoding
- byte[] data = Base64.getDecoder().decode(msg.getKey());
- return keyDeserializer.deserialize(topic, data);
+ if (keySchema instanceof PulsarKafkaSchema) {
+ PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
+ Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
+ if (kafkaDeserializer instanceof StringDeserializer) {
+ return (K) msg.getKey();
+ }
+ pulsarKafkaSchema.setTopic(topic);
}
+ // Assume base64 encoding
+ byte[] data = Base64.getDecoder().decode(msg.getKey());
+ return keySchema.decode(data);
+
}
@Override
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 00c49bb..1c5758f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -43,20 +43,21 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
-import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
-import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,59 +68,68 @@
private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
- private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
+ private final Schema<K> keySchema;
+ private final Schema<V> valueSchema;
private final Partitioner partitioner;
private volatile Cluster cluster = Cluster.empty();
private List<ProducerInterceptor<K, V>> interceptors;
+ private final Properties properties;
+
public PulsarKafkaProducer(Map<String, Object> configs) {
- this(configs, null, null);
+ this(new ProducerConfig(configs), null, null);
}
public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- this(configs, new Properties(), keySerializer, valueSerializer);
+ Serializer<V> valueSerializer) {
+ this(new ProducerConfig(configs), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
+ }
+
+ public PulsarKafkaProducer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ProducerConfig(configs), keySchema, valueSchema);
}
public PulsarKafkaProducer(Properties properties) {
- this(properties, null, null);
+ this(new ProducerConfig(properties), null, null);
}
public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- this(new HashMap<>(), properties, keySerializer, valueSerializer);
+ this(new ProducerConfig(properties), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
+ }
+
+ public PulsarKafkaProducer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ProducerConfig(properties), keySchema, valueSchema);
}
@SuppressWarnings({ "unchecked", "deprecation" })
- private PulsarKafkaProducer(Map<String, Object> conf, Properties properties, Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- properties.forEach((k, v) -> conf.put((String) k, v));
+ private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
- ProducerConfig producerConfig = new ProducerConfig(conf);
-
- if (keySerializer == null) {
- this.keySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- Serializer.class);
- this.keySerializer.configure(producerConfig.originals(), true);
+ if (keySchema == null) {
+ Serializer<K> kafkaKeySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ kafkaKeySerializer.configure(producerConfig.originals(), true);
+ this.keySchema = new PulsarKafkaSchema<>(kafkaKeySerializer);
} else {
- this.keySerializer = keySerializer;
+ this.keySchema = keySchema;
producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
- if (valueSerializer == null) {
- this.valueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- Serializer.class);
- this.valueSerializer.configure(producerConfig.originals(), false);
+ if (valueSchema == null) {
+ Serializer<V> kafkaValueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ kafkaValueSerializer.configure(producerConfig.originals(), false);
+ this.valueSchema = new PulsarKafkaSchema<>(kafkaValueSerializer);
} else {
- this.valueSerializer = valueSerializer;
+ this.valueSchema = valueSchema;
producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
partitioner.configure(producerConfig.originals());
+ this.properties = new Properties();
+ producerConfig.originals().forEach((k, v) -> properties.put(k, v));
+
long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
@@ -275,7 +285,7 @@
// Add the partitions info for the new topic
cluster = cluster.withPartitions(readPartitionsInfo(topic));
List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
- .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySerializer, valueSerializer, topic))
+ .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
.collect(Collectors.toList());
return pulsarProducerBuilder.clone()
.topic(topic)
@@ -312,7 +322,10 @@
builder.eventTime(record.timestamp());
}
- byte[] value = valueSerializer.serialize(record.topic(), record.value());
+ if (valueSchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
+ }
+ byte[] value = valueSchema.encode(record.value());
builder.value(value);
if (record.partition() != null) {
@@ -329,12 +342,14 @@
private String getKey(String topic, K key) {
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
- if (keySerializer instanceof StringSerializer) {
+ if (key instanceof String) {
return (String) key;
- } else {
- byte[] keyBytes = keySerializer.serialize(topic, key);
- return Base64.getEncoder().encodeToString(keyBytes);
}
+ if (keySchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema) keySchema).setTopic(topic);
+ }
+ byte[] keyBytes = keySchema.encode(key);
+ return Base64.getEncoder().encodeToString(keyBytes);
}
private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
index 24697cd..dbe827f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -67,9 +67,9 @@
final private org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor;
// For serializer key/value, and to determine the deserializer for key/value.
- private final Serializer<K> keySerializer;
+ private final Schema<K> keySchema;
- private final Serializer<V> valueSerializer;
+ private final Schema<V> valueSchema;
// Keep the topic, as each Pulsar producer will tie to a Kafka topic, and ProducerInterceptor will tie to a Pulsar
// producer, it's safe to set it as final.
@@ -91,12 +91,12 @@
* @param topic Topic this {@link ProducerInterceptor} will be associated to.
*/
public KafkaProducerInterceptorWrapper(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
+ Schema<K> keySchema,
+ Schema<V> valueSchema,
String topic) {
this.kafkaProducerInterceptor = kafkaProducerInterceptor;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
+ this.keySchema = keySchema;
+ this.valueSchema = valueSchema;
this.topic = topic;
}
@@ -163,7 +163,10 @@
private Message<byte[]> toPulsarMessage(ProducerRecord<K, V> producerRecord) {
TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, scheme);
typedMessageBuilder.key(serializeKey(topic, producerRecord.key()));
- typedMessageBuilder.value(valueSerializer.serialize(topic, producerRecord.value()));
+ if (valueSchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
+ }
+ typedMessageBuilder.value(valueSchema.encode(producerRecord.value()));
typedMessageBuilder.eventTime(eventTime);
typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, partitionID);
return typedMessageBuilder.getMessage();
@@ -178,8 +181,14 @@
* @return Kafka record.
*/
private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
- Deserializer valueDeserializer = getDeserializer(valueSerializer);
- V value = (V) valueDeserializer.deserialize(topic, message.getValue());
+ V value;
+ if (valueSchema instanceof PulsarKafkaSchema) {
+ PulsarKafkaSchema<V> pulsarKeyKafkaSchema = (PulsarKafkaSchema<V>) valueSchema;
+ Deserializer valueDeserializer = getDeserializer((pulsarKeyKafkaSchema.getKafkaSerializer()));
+ value = (V) valueDeserializer.deserialize(topic, message.getValue());
+ } else {
+ value = valueSchema.decode(message.getValue());
+ }
try {
scheme = (Schema<byte[]>) FieldUtils.readField(message, "schema", true);
PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
@@ -198,22 +207,28 @@
private String serializeKey(String topic, K key) {
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
- if (keySerializer instanceof StringSerializer) {
+ if (key instanceof String) {
return (String) key;
- } else {
- byte[] keyBytes = keySerializer.serialize(topic, key);
- return Base64.getEncoder().encodeToString(keyBytes);
}
+ if (keySchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema<K>) keySchema).setTopic(topic);
+ }
+ byte[] keyBytes = keySchema.encode(key);
+ return Base64.getEncoder().encodeToString(keyBytes);
}
private K deserializeKey(String topic, String key) {
- // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
- if (keySerializer instanceof StringSerializer) {
- return (K) key;
- } else {
- Deserializer keyDeserializer = getDeserializer(keySerializer);
+ if (keySchema instanceof PulsarKafkaSchema) {
+ PulsarKafkaSchema<K> pulsarKeyKafkaSchema = (PulsarKafkaSchema<K>) keySchema;
+ // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+ if (pulsarKeyKafkaSchema.getKafkaSerializer() instanceof StringSerializer) {
+ return (K) key;
+ }
+
+ Deserializer keyDeserializer = getDeserializer(pulsarKeyKafkaSchema.getKafkaSerializer());
return (K) keyDeserializer.deserialize(topic, Base64.getDecoder().decode(key));
}
+ return keySchema.decode(Base64.getDecoder().decode(key));
}
/**
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
new file mode 100644
index 0000000..807f482
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PulsarKafkaSchema<T> implements Schema<T> {
+
+ private final Serializer<T> kafkaSerializer;
+
+ private final Deserializer<T> kafkaDeserializer;
+
+ private String topic;
+
+ public PulsarKafkaSchema(Serializer<T> serializer) {
+ this(serializer, null);
+ }
+
+ public PulsarKafkaSchema(Deserializer<T> deserializer) {
+ this(null, deserializer);
+ }
+
+ public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
+ this.kafkaSerializer = serializer;
+ this.kafkaDeserializer = deserializer;
+ }
+
+ public Serializer<T> getKafkaSerializer() {
+ return kafkaSerializer;
+ }
+
+ public Deserializer<T> getKafkaDeserializer() {
+ return kafkaDeserializer;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public byte[] encode(T message) {
+ checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
+ return kafkaSerializer.serialize(this.topic, message);
+ }
+
+ @Override
+ public T decode(byte[] message) {
+ checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
+ return kafkaDeserializer.deserialize(this.topic, message);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return Schema.BYTES.getSchemaInfo();
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 2c4af1e..1ded3c6 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -18,19 +18,23 @@
*/
package org.apache.kafka.clients.producer;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
@@ -64,6 +68,24 @@
@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
public class PulsarKafkaProducerTest {
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ private int field3;
+ }
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Bar {
+ private boolean field1;
+ }
+
@ObjectFactory
// Necessary to make PowerMockito.mockStatic work with TestNG.
public IObjectFactory getObjectFactory() {
@@ -103,7 +125,7 @@
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
- new PulsarKafkaProducer<>(properties, null, null);
+ new PulsarKafkaProducer<>(properties);
verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
@@ -149,7 +171,7 @@
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// Act
- PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+ PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties);
pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));
@@ -157,6 +179,65 @@
verify(mockProducerBuilder, times(1)).intercept(anyVararg());
}
+ @Test
+ public void testPulsarKafkaSendAvro() throws PulsarClientException {
+ // Arrange
+ PulsarClient mockClient = mock(PulsarClient.class);
+ ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+ ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+ CompletableFuture mockPartitionFuture = new CompletableFuture();
+ CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+ TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+ mockPartitionFuture.complete(new ArrayList<>());
+ mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+ doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+ doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+ doReturn(mockClient).when(mockClientBuilder).build();
+ doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(anyVararg());
+ doReturn(mockProducer).when(mockProducerBuilder).create();
+ doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+ doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+ PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+ PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+ when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+ Properties properties = new Properties();
+ List interceptors = new ArrayList();
+ interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
+
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ // Act
+ PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+
+ pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
+
+ // Verify
+ verify(mockTypedMessageBuilder, times(1)).sendAsync();
+ verify(mockProducerBuilder, times(1)).intercept(anyVararg());
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
Properties properties = new Properties();
@@ -166,7 +247,7 @@
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
- new PulsarKafkaProducer<>(properties, null, null);
+ new PulsarKafkaProducer<>(properties);
}
public static class PulsarKafkaProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index aadfce8..0f15691 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -36,6 +36,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.BytesSchema;
@@ -97,9 +98,12 @@
}
}).when(mockInterceptor2).onSend(any(ProducerRecord.class));
+
+ Schema<String> pulsarKeySerializeSchema = new PulsarKafkaSchema<>(new StringSerializer());
+ Schema<byte[]> pulsarValueSerializeSchema = new PulsarKafkaSchema<>(new ByteArraySerializer());
ProducerInterceptors producerInterceptors = new ProducerInterceptors(Arrays.asList(new ProducerInterceptor[]{
- new KafkaProducerInterceptorWrapper(mockInterceptor1, new StringSerializer(), new ByteArraySerializer(), topic),
- new KafkaProducerInterceptorWrapper(mockInterceptor2, new StringSerializer(), new ByteArraySerializer(), topic)}));
+ new KafkaProducerInterceptorWrapper(mockInterceptor1, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic),
+ new KafkaProducerInterceptorWrapper(mockInterceptor2, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic)}));
TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, new BytesSchema());
typedMessageBuilder.key("original key");