blob: bec2c354b95ec3fdaa282d402c6684485527d335 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.kafka.clients.producer;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import com.google.common.annotations.VisibleForTesting;
//Questions
/**
* 1. What's the auth method
* 2. What if message publish fails with async
*/
import kafka.javaapi.producer.Producer;
import kafka.producer.DefaultPartitioner;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.serializer.Encoder;
import kafka.serializer.StringEncoder;
import kafka.utils.VerifiableProperties;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PulsarKafkaProducer<K, V> extends Producer<K, V> {
private final PulsarClient client;
private final ProducerBuilder<byte[]> pulsarProducerBuilder;
private final Partitioner partitioner;
private final Encoder<K> keySerializer;
private final Encoder<V> valueSerializer;
private final boolean isSendAsync;
public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS = "queue.buffering.max.ms";
public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = "queue.buffering.max.messages";
public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages";
public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public PulsarKafkaProducer(ProducerConfig config) {
super((kafka.producer.Producer) null);
partitioner = config.partitionerClass() != null
? newInstance(config.partitionerClass(), Partitioner.class, config.props())
: new DefaultPartitioner(config.props());
// kafka-config returns default serializer if client doesn't configure it
checkNotNull(config.keySerializerClass(), "key-serializer class can't be null");
checkNotNull(config.serializerClass(), "value-serializer class can't be null");
keySerializer = newInstance(config.keySerializerClass(), Encoder.class, config.props());
valueSerializer = newInstance(config.serializerClass(), Encoder.class, config.props());
Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
: new Properties();
String serviceUrl = config.brokerList();
try {
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new IllegalArgumentException(
"Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
}
pulsarProducerBuilder = client.newProducer();
// doc: https://kafka.apache.org/08/documentation.html#producerapi
// api-doc:
// https://github.com/apache/kafka/blob/0.8.2.2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
// queue.enqueue.timeout.ms: The amount of time to block before dropping messages when running in async mode and
// the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or
// dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block
// indefinitely and never willingly drop a send.
boolean blockIfQueueFull = config.queueEnqueueTimeoutMs() == -1 ? true : false;
// This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values
// are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we
// allow batching together of requests (which is great for throughput) but open the possibility of a failure of
// the client machine dropping unsent data.
isSendAsync = "async".equalsIgnoreCase(config.producerType());
CompressionType compressionType = CompressionType.NONE;
// Valid values are "none", "gzip" and "snappy".
if ("gzip".equals(config.compressionCodec().name())) {
compressionType = CompressionType.ZLIB;
} else if ("snappy".equals(config.compressionCodec().name())) {
compressionType = CompressionType.SNAPPY;
}
long batchDelayMs = config.queueBufferingMaxMs();
if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES)) {
pulsarProducerBuilder.maxPendingMessages(config.queueBufferingMaxMessages());
}
if (properties.containsKey(KAFKA_KEY_MAX_BATCH_MESSAGES)) {
pulsarProducerBuilder.batchingMaxMessages(config.batchNumMessages());
}
if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS)) {
pulsarProducerBuilder.batchingMaxPublishDelay(batchDelayMs, TimeUnit.MILLISECONDS);
}
if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), TimeUnit.MILLISECONDS);
}
pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType);
}
public PulsarKafkaProducer(kafka.producer.Producer<K, V> producer) {
this(producer.config());
}
@Override
public void send(KeyedMessage<K, V> message) {
org.apache.pulsar.client.api.Producer<byte[]> producer;
try {
producer = producers.computeIfAbsent(message.topic(), topic -> createNewProducer(topic));
} catch (Exception e) {
throw new IllegalArgumentException("Failed to create producer for " + message.topic(), e);
}
TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
buildMessage(messageBuilder, message);
if (isSendAsync) {
// what if message publish fails:
// according to : https://kafka.apache.org/08/documentation.html#producerapi
// async: opens the possibility of a failure of the client machine dropping unsent data
messageBuilder.sendAsync().handle((res, ex) -> {
if (ex != null) {
log.warn("publish failed for {}", producer.getTopic(), ex);
}
return null;
});
} else {
try {
messageBuilder.send();
} catch (PulsarClientException e) {
log.warn("publish failed for {}", producer.getTopic(), e);
throw new IllegalStateException("Failed to publish message " + message.topic(), e);
}
}
}
@Override
public void send(List<KeyedMessage<K, V>> messages) {
if (messages != null) {
messages.forEach(this::send);
}
}
private void buildMessage(TypedMessageBuilder<byte[]> builder, KeyedMessage<K, V> message) {
if (message.key() != null) {
String key = getKey(message.topic(), message.key());
builder.key(key);
}
byte[] value = valueSerializer.toBytes(message.message());
builder.value(value);
}
private String getKey(String topic, K key) {
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
if (keySerializer!=null && keySerializer instanceof StringEncoder) {
return (String) key;
} else {
byte[] keyBytes = keySerializer.toBytes(key);
return Base64.getEncoder().encodeToString(keyBytes);
}
}
private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
try {
pulsarProducerBuilder.messageRoutingMode(MessageRoutingMode.CustomPartition);
pulsarProducerBuilder.messageRouter(new MessageRouter() {
private static final long serialVersionUID = 1L;
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
// https://kafka.apache.org/08/documentation.html#producerapi
// The default partitioner is based on the hash of the key.
return partitioner.partition(msg.getKey(), metadata.numPartitions());
}
});
log.info("Creating producer for topic {} with config {}", topic, pulsarProducerBuilder.toString());
return pulsarProducerBuilder.clone().topic(topic).create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
public <T> T newInstance(String key, Class<T> t, VerifiableProperties properties) {
Class<?> c = null;
try {
c = Class.forName(key);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("class not found for :" + key);
}
if (c == null)
return null;
Object o = newInstance(c, properties);
if (!t.isInstance(o)) {
throw new IllegalArgumentException(c.getName() + " is not an instance of " + t.getName());
}
return t.cast(o);
}
public static <T> T newInstance(Class<T> c, VerifiableProperties properties) {
try {
try {
Constructor<T> constructor = c.getConstructor(VerifiableProperties.class);
constructor.setAccessible(true);
return constructor.newInstance(properties);
} catch (Exception e) {
// Ok.. not a default implementation class
}
return c.newInstance();
} catch (IllegalAccessException e) {
throw new IllegalArgumentException("Could not instantiate class " + c.getName(), e);
} catch (InstantiationException e) {
throw new IllegalArgumentException(
"Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?",
e);
} catch (NullPointerException e) {
throw new IllegalArgumentException("Requested class was null", e);
}
}
@Override
public void close() {
if (producers != null) {
producers.forEach((topic, producer) -> {
try {
producer.close();
} catch (PulsarClientException e) {
log.warn("Failed to close producer for {}", topic, e.getMessage());
}
});
}
if (client != null) {
try {
client.close();
} catch (PulsarClientException e) {
log.warn("Failed to close pulsar-client {}", e.getMessage());
}
}
}
@VisibleForTesting
public ProducerBuilder<byte[]> getPulsarProducerBuilder() {
return pulsarProducerBuilder;
}
@VisibleForTesting
public Partitioner getPartitioner() {
return partitioner;
}
@VisibleForTesting
public Encoder<K> getKeySerializer() {
return keySerializer;
}
@VisibleForTesting
public Encoder<V> getValueSerializer() {
return valueSerializer;
}
}