blob: aba5d1dbd3fffd483d657250822b423e4323d8a0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.kafka.clients.producer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private final PulsarClient client;
private final ProducerBuilder<byte[]> pulsarProducerBuilder;
private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
private final Schema<K> keySchema;
private final Schema<V> valueSchema;
private final Partitioner partitioner;
private volatile Cluster cluster = Cluster.empty();
private List<ProducerInterceptor<K, V>> interceptors;
private final Properties properties;
public PulsarKafkaProducer(Map<String, Object> configs) {
this(new ProducerConfig(configs), null, null);
}
public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
this(new ProducerConfig(configs), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
}
public PulsarKafkaProducer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
this(new ProducerConfig(configs), keySchema, valueSchema);
}
public PulsarKafkaProducer(Properties properties) {
this(new ProducerConfig(properties), null, null);
}
public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(properties), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
}
public PulsarKafkaProducer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
this(new ProducerConfig(properties), keySchema, valueSchema);
}
@SuppressWarnings({ "unchecked", "deprecation" })
private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
if (keySchema == null) {
Serializer<K> kafkaKeySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
kafkaKeySerializer.configure(producerConfig.originals(), true);
this.keySchema = new PulsarKafkaSchema<>(kafkaKeySerializer);
} else {
this.keySchema = keySchema;
producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (valueSchema == null) {
Serializer<V> kafkaValueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
kafkaValueSerializer.configure(producerConfig.originals(), false);
this.valueSchema = new PulsarKafkaSchema<>(kafkaValueSerializer);
} else {
this.valueSchema = valueSchema;
producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
partitioner.configure(producerConfig.originals());
this.properties = new Properties();
producerConfig.originals().forEach((k, v) -> properties.put(k, v));
long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
try {
// Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
// If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS).build();
} catch (ArithmeticException e) {
String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
logger.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(client, properties);
// To mimic the same batching mode as Kafka, we need to wait a very little amount of
// time to batch if the client is trying to send messages fast enough
long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
if ("gzip".equals(compressionType)) {
pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
} else if ("lz4".equals(compressionType)) {
pulsarProducerBuilder.compressionType(CompressionType.LZ4);
}
pulsarProducerBuilder.messageRouter(new KafkaMessageRouter(lingerMs));
int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
boolean blockOnBufferFull = Boolean
.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"));
// Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
// Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
interceptors = (List) producerConfig.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
org.apache.pulsar.client.api.Producer<byte[]> producer;
try {
producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic));
} catch (Exception e) {
if (callback != null) {
callback.onCompletion(null, e);
}
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
int messageSize = buildMessage(messageBuilder, record);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
messageBuilder.sendAsync().thenAccept((messageId) -> {
future.complete(getRecordMetadata(record.topic(), messageBuilder, messageId, messageSize));
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
future.handle((recordMetadata, throwable) -> {
if (callback != null) {
Exception exception = throwable != null ? new Exception(throwable) : null;
callback.onCompletion(recordMetadata, exception);
}
return null;
});
return future;
}
@Override
public void flush() {
producers.values().stream()
.map(p -> p.flushAsync())
.collect(Collectors.toList())
.forEach(CompletableFuture::join);
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
throw new UnsupportedOperationException();
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.emptyMap();
}
@Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
partitioner.close();
}
@Override
public void close(long timeout, TimeUnit unit) {
try {
client.closeAsync().get(timeout, unit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
try {
// Add the partitions info for the new topic
synchronized (this){
cluster = cluster.withPartitions(readPartitionsInfo(topic));
}
List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
.map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
.collect(Collectors.toList());
return pulsarProducerBuilder.clone()
.topic(topic)
.intercept(wrappedInterceptors.toArray(new org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()]))
.create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
private Map<TopicPartition, PartitionInfo> readPartitionsInfo(String topic) {
List<String> partitions = client.getPartitionsForTopic(topic).join();
Map<TopicPartition, PartitionInfo> partitionsInfo = new HashMap<>();
for (int i = 0; i < partitions.size(); i++) {
TopicPartition tp = new TopicPartition(topic, i);
PartitionInfo pi = new PartitionInfo(topic, i, null, null, null);
partitionsInfo.put(tp, pi);
}
return partitionsInfo;
}
private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K, V> record) {
byte[] keyBytes = null;
if (record.key() != null) {
String key = getKey(record.topic(), record.key());
keyBytes = key.getBytes(StandardCharsets.UTF_8);
builder.key(key);
}
if (record.timestamp() != null) {
builder.eventTime(record.timestamp());
}
if (valueSchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
}
byte[] value = valueSchema.encode(record.value());
builder.value(value);
if (record.partition() != null) {
// Partition was explicitly set on the record
builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
} else {
// Get the partition id from the partitioner
int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
}
return value.length;
}
private String getKey(String topic, K key) {
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
if (key instanceof String) {
return (String) key;
}
if (keySchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema) keySchema).setTopic(topic);
}
byte[] keyBytes = keySchema.encode(key);
return Base64.getEncoder().encodeToString(keyBytes);
}
private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
int size) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
// Combine ledger id and entry id to form offset
long offset = MessageIdUtils.getOffset(msgId);
int partition = msgId.getPartitionIndex();
TopicPartition tp = new TopicPartition(topic, partition);
TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
return new RecordMetadata(tp, offset, 0, mb.getPublishTime(), 0, mb.hasKey() ? mb.getKey().length() : 0, size);
}
private ProducerInterceptor createKafkaProducerInterceptor(String clazz) {
try {
return (ProducerInterceptor) Class.forName(clazz).newInstance();
} catch (ClassNotFoundException e) {
String errorMessage = "Can't find Interceptor class: " + e.getMessage();
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
} catch (InstantiationException e) {
String errorMessage = "Can't initiate provided Interceptor class: " + e.getMessage();
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
} catch (IllegalAccessException e) {
String errorMessage = "Can't access provided Interceptor class: " + e.getMessage();
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
}
}
private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
}