blob: 1800c5550a5b1903139d758a6370704ae9b622b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.dataproxy.sink.mq.pulsar;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.OrderBatchPackProfileV0;
import org.apache.inlong.dataproxy.sink.mq.SimpleBatchPackProfileV0;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.SecureRandom;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_STATS_INTERVAL_SECONDS;
/**
* PulsarHandler
*/
public class PulsarHandler implements MessageQueueHandler {
public static final Logger LOG = LoggerFactory.getLogger(PulsarHandler.class);
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
public static final String KEY_TENANT = "tenant";
public static final String KEY_NAMESPACE = "namespace";
public static final String KEY_SERVICE_URL = "serviceUrl";
public static final String KEY_AUTHENTICATION = "authentication";
public static final String KEY_ENABLEBATCHING = "enableBatching";
public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
public static final String KEY_SENDTIMEOUT = "sendTimeout";
public static final String KEY_COMPRESSIONTYPE = "compressionType";
public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
+ "BatchingPartitionSwitchFrequency";
public static final String KEY_IOTHREADS = "ioThreads";
public static final String KEY_MEMORYLIMIT = "memoryLimit";
public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker";
private CacheClusterConfig config;
private String clusterName;
private MessageQueueZoneSinkContext sinkContext;
private String tenant;
private String namespace;
private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
/**
* pulsar client
*/
private PulsarClient client;
private ProducerBuilder<byte[]> baseBuilder;
private ConcurrentHashMap<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
/**
* init
* @param config
* @param sinkContext
*/
public void init(CacheClusterConfig config, MessageQueueZoneSinkContext sinkContext) {
this.config = config;
this.clusterName = config.getClusterName();
this.sinkContext = sinkContext;
this.tenant = config.getParams().get(KEY_TENANT);
this.namespace = config.getParams().get(KEY_NAMESPACE);
}
/**
* start
*/
@Override
public void start() {
// create pulsar client
try {
String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
String authentication = config.getParams().get(KEY_AUTHENTICATION);
if (StringUtils.isEmpty(authentication)) {
authentication = config.getToken();
}
Context context = sinkContext.getProducerContext();
ClientBuilder builder = PulsarClient.builder();
if (StringUtils.isNotEmpty(authentication)) {
builder.authentication(AuthenticationFactory.token(authentication));
}
this.client = builder
.serviceUrl(serviceUrl)
.ioThreads(context.getInteger(KEY_IOTHREADS, 1))
.memoryLimit(context.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES)
.connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
.statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS), -1),
TimeUnit.SECONDS)
.build();
this.baseBuilder = client.newProducer();
// Map<String, Object> builderConf = new HashMap<>();
// builderConf.putAll(context.getParameters());
this.baseBuilder
.sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
.maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
.maxPendingMessagesAcrossPartitions(
context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000));
this.baseBuilder
.batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500))
.batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
TimeUnit.MILLISECONDS)
.batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 131072));
this.baseBuilder
.accessMode(ProducerAccessMode.Shared)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true));
this.baseBuilder
.roundRobinRouterBatchingPartitionSwitchFrequency(
context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
.enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
.compressionType(this.getPulsarCompressionType());
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
LOG.info("pulsar handler started");
}
/**
* stop
*/
@Override
public void stop() {
for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
try {
entry.getValue().close();
} catch (PulsarClientException e) {
LOG.error(e.getMessage(), e);
}
}
try {
this.client.close();
} catch (PulsarClientException e) {
LOG.error(e.getMessage(), e);
}
LOG.info("pulsar handler stopped");
}
@Override
public void publishTopic(Set<String> topicSet) {
//
}
/**
* send
* @param event
* @return
*/
@Override
public boolean send(BatchPackProfile event) {
try {
// idConfig
IdTopicConfig idConfig = ConfigManager.getInstance().getIdTopicConfig(
event.getInlongGroupId(), event.getInlongStreamId());
if (idConfig == null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
sinkContext.addSendResultMetric(event, clusterName, event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
event.fail();
return false;
}
// topic
String producerTopic = idConfig.getPulsarTopicName(tenant, namespace);
if (producerTopic == null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
sinkContext.addSendResultMetric(event, clusterName, event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
event.fail();
return false;
}
// get producer
Producer<byte[]> producer = this.producerMap.get(producerTopic);
if (producer == null) {
try {
LOG.info("try to new a object for topic " + producerTopic);
SecureRandom secureRandom = new SecureRandom(
(producerTopic + System.currentTimeMillis()).getBytes());
String producerName = producerTopic + "-" + secureRandom.nextLong();
producer = baseBuilder.clone().topic(producerTopic)
.producerName(producerName)
.create();
LOG.info("create new producer success:{}", producer.getProducerName());
Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(producerTopic, producer);
if (oldProducer != null) {
producer.close();
LOG.info("close producer success:{}", producer.getProducerName());
producer = oldProducer;
}
} catch (Throwable ex) {
LOG.error("create new producer failed", ex);
}
}
// create producer failed
if (producer == null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
sinkContext.processSendFail(event, clusterName, producerTopic, 0);
return false;
}
// send
if (event instanceof SimpleBatchPackProfileV0) {
this.sendSimpleProfileV0((SimpleBatchPackProfileV0) event, idConfig, producer, producerTopic);
} else if (event instanceof OrderBatchPackProfileV0) {
this.sendOrderProfileV0((OrderBatchPackProfileV0) event, idConfig, producer, producerTopic);
} else {
this.sendProfileV1(event, idConfig, producer, producerTopic);
}
return true;
} catch (Exception e) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
sinkContext.processSendFail(event, clusterName, event.getUid(), 0);
LOG.error(e.getMessage(), e);
return false;
}
}
/**
* getPulsarCompressionType
*
* @return CompressionType
*/
private CompressionType getPulsarCompressionType() {
Context context = sinkContext.getProducerContext();
String type = context.getString(KEY_COMPRESSIONTYPE, CompressionType.SNAPPY.name());
switch (type) {
case "LZ4":
return CompressionType.LZ4;
case "ZLIB":
return CompressionType.ZLIB;
case "ZSTD":
return CompressionType.ZSTD;
case "SNAPPY":
return CompressionType.SNAPPY;
case "NONE":
default:
return CompressionType.NONE;
}
}
/**
* sendProfileV1
*/
private void sendProfileV1(BatchPackProfile event, IdTopicConfig idConfig, Producer<byte[]> producer,
String producerTopic) throws Exception {
EventHandler handler = handlerLocal.get();
if (handler == null) {
handler = this.sinkContext.createEventHandler();
handlerLocal.set(handler);
}
// headers
Map<String, String> headers = handler.parseHeader(idConfig, event, sinkContext.getNodeId(),
sinkContext.getCompressType());
// compress
byte[] bodyBytes = handler.parseBody(idConfig, event, sinkContext.getCompressType());
// metric
sinkContext.addSendMetric(event, clusterName, producerTopic, bodyBytes.length);
// sendAsync
long sendTime = System.currentTimeMillis();
CompletableFuture<MessageId> future = producer.newMessage().properties(headers)
.value(bodyBytes).sendAsync();
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, producerTopic, sendTime);
LOG.error("Send ProfileV1 to Pulsar failure", ex);
} else {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
}
});
}
/**
* sendSimpleProfileV0
*/
private void sendSimpleProfileV0(SimpleBatchPackProfileV0 event, IdTopicConfig idConfig,
Producer<byte[]> producer,
String producerTopic) throws Exception {
// headers
Map<String, String> headers = event.getProperties();
if (MapUtils.isEmpty(headers)) {
headers = event.getSimpleProfile().getHeaders();
}
// body
byte[] bodyBytes = event.getSimpleProfile().getBody();
// metric
sinkContext.addSendMetric(event, clusterName, producerTopic, bodyBytes.length);
// sendAsync
long sendTime = System.currentTimeMillis();
CompletableFuture<MessageId> future = producer.newMessage().properties(headers)
.value(bodyBytes).sendAsync();
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, producerTopic, sendTime);
LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
} else {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
}
});
}
/**
* sendOrderProfileV0
*/
private void sendOrderProfileV0(OrderBatchPackProfileV0 event, IdTopicConfig idConfig, Producer<byte[]> producer,
String producerTopic) throws Exception {
// headers
Map<String, String> headers = event.getOrderProfile().getHeaders();
// compress
byte[] bodyBytes = event.getOrderProfile().getBody();
// metric
sinkContext.addSendMetric(event, clusterName, producerTopic, bodyBytes.length);
// sendAsync
long sendTime = System.currentTimeMillis();
CompletableFuture<MessageId> future = producer.newMessage().properties(headers)
.value(bodyBytes).sendAsync();
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, producerTopic, sendTime);
LOG.error("Send OrderProfileV0 to Pulsar failure", ex);
} else {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
event.ackOrder();
}
});
}
}