blob: 601f6df9f4db63a052974173bbc50be862298410 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.ons.open.trace.core.dispatch.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDataEncoder;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceTransferBean;
import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.remoting.RPCHook;
public class AsyncArrayDispatcher implements AsyncDispatcher {
private final static InternalLogger CLIENT_LOG = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
private DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecutor;
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<OnsTraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private String dispatcherType;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String customizedTraceTopic;
/**
* Create AsyncArrayDispatcher with acl RPC hook.
*
* @param properties
* @param rpcHook RPC hook only can be set with AclRPCHook
*/
public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) {
dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType);
this.customizedTraceTopic = properties.getProperty(OnsTraceConstants.CustomizedTraceTopic);
int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048"));
queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
this.queueSize = queueSize;
batchSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxBatchNum, "1"));
this.discardCount = new AtomicLong(0L);
traceContextQueue = new ArrayBlockingQueue<OnsTraceContext>(1024);
appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
}
public DefaultMQProducerImpl getHostProducer() {
return hostProducer;
}
public void setHostProducer(DefaultMQProducerImpl hostProducer) {
this.hostProducer = hostProducer;
}
public DefaultMQPushConsumerImpl getHostConsumer() {
return hostConsumer;
}
public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
this.hostConsumer = hostConsumer;
}
@Override
public void start() throws MQClientException {
TraceProducerFactory.registerTraceDispatcher(dispatcherId);
this.worker = new ThreadFactoryImpl("MQ-AsyncArrayDispatcher-Thread-" + dispatcherId, true)
.newThread(new AsyncRunnable());
this.worker.start();
this.registerShutDownHook();
}
@Override
public void start(String nameServerAddresses) throws MQClientException {
this.traceProducer.setNamesrvAddr(nameServerAddresses);
this.start();
}
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((OnsTraceContext) ctx);
if (!result) {
CLIENT_LOG.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}
@Override
public void flush() throws IOException {
long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
CLIENT_LOG.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());
}
@Override
public void shutdown() {
this.stopped = true;
this.traceExecutor.shutdown();
TraceProducerFactory.unregisterTraceDispatcher(dispatcherId);
this.removeShutdownHook();
}
public void registerShutDownHook() {
if (shutDownHook == null) {
shutDownHook = new ThreadFactoryImpl("ShutdownHookMQTrace").newThread(new Runnable() {
private volatile boolean hasShutdown = false;
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
try {
flush();
} catch (IOException e) {
CLIENT_LOG.error("system mqtrace hook shutdown failed ,maybe loss some trace data");
}
}
}
}
});
Runtime.getRuntime().addShutdownHook(shutDownHook);
}
}
public void removeShutdownHook() {
if (shutDownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutDownHook);
}
}
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
List<OnsTraceContext> contexts = new ArrayList<OnsTraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
OnsTraceContext context = null;
try {
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncArrayDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
class AsyncAppenderRequest implements Runnable {
List<OnsTraceContext> contextList;
public AsyncAppenderRequest(final List<OnsTraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<OnsTraceContext>(1);
}
}
@Override
public void run() {
sendTraceData(contextList);
}
public void sendTraceData(List<OnsTraceContext> contextList) {
Map<String, List<OnsTraceTransferBean>> transBeanMap = new HashMap<String, List<OnsTraceTransferBean>>(16);
String currentRegionId = null;
for (OnsTraceContext context : contextList) {
currentRegionId = context.getRegionId();
if (currentRegionId == null || context.getTraceBeans().isEmpty()) {
continue;
}
String topic = context.getTraceBeans().get(0).getTopic();
String key = topic + OnsTraceConstants.CONTENT_SPLITOR + currentRegionId;
List<OnsTraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<OnsTraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
OnsTraceTransferBean traceData = OnsTraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
for (Map.Entry<String, List<OnsTraceTransferBean>> entry : transBeanMap.entrySet()) {
String[] key = entry.getKey().split(String.valueOf(OnsTraceConstants.CONTENT_SPLITOR));
flushData(entry.getValue(), key[0], key[1]);
}
}
private void flushData(List<OnsTraceTransferBean> transBeanList, String topic, String currentRegionId) {
if (transBeanList.size() == 0) {
return;
}
StringBuilder buffer = new StringBuilder(1024);
int count = 0;
Set<String> keySet = new HashSet<String>();
for (OnsTraceTransferBean bean : transBeanList) {
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
count++;
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString(), topic, currentRegionId);
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString(), topic, currentRegionId);
}
transBeanList.clear();
}
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic,
String currentRegionId) {
String topic = customizedTraceTopic;
if (StringUtils.isBlank(topic)) {
topic = OnsTraceConstants.traceTopic + currentRegionId;
}
final Message message = new Message(topic, data.getBytes());
message.setKeys(keySet);
try {
Set<String> dataBrokerSet = getBrokerSetByTopic(dataTopic);
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
dataBrokerSet.retainAll(traceBrokerSet);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
CLIENT_LOG.info("send trace data ,the traceData is: {} ", data, e);
}
};
if (dataBrokerSet.isEmpty()) {
//no cross set
traceProducer.send(message, callback, 5000);
} else {
traceProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Set<String> brokerSet = (Set<String>) arg;
List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
for (MessageQueue queue : mqs) {
if (brokerSet.contains(queue.getBrokerName())) {
filterMqs.add(queue);
}
}
int index = sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % filterMqs.size();
if (pos < 0) {
pos = 0;
}
return filterMqs.get(pos);
}
}, dataBrokerSet, callback);
}
} catch (Exception e) {
CLIENT_LOG.info("send trace data,the traceData is: {}", data, e);
}
}
private Set<String> getBrokerSetByTopic(String topic) {
Set<String> brokerSet = new HashSet<String>();
if (dispatcherType != null && dispatcherType.equals(OnsTraceDispatcherType.PRODUCER.name()) && hostProducer != null) {
brokerSet = tryGetMessageQueueBrokerSet(hostProducer, topic);
}
if (dispatcherType != null && dispatcherType.equals(OnsTraceDispatcherType.CONSUMER.name()) && hostConsumer != null) {
brokerSet = tryGetMessageQueueBrokerSet(hostConsumer, topic);
}
return brokerSet;
}
private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
Set<String> brokerSet = new HashSet<String>();
String realTopic = NamespaceUtil.wrapNamespace(producer.getDefaultMQProducer().getNamespace(), topic);
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(realTopic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
producer.getTopicPublishInfoTable().putIfAbsent(realTopic, new TopicPublishInfo());
producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(realTopic);
topicPublishInfo = producer.getTopicPublishInfoTable().get(realTopic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
brokerSet.add(queue.getBrokerName());
}
}
return brokerSet;
}
private Set<String> tryGetMessageQueueBrokerSet(DefaultMQPushConsumerImpl consumer, String topic) {
Set<String> brokerSet = new HashSet<String>();
try {
String realTopic = NamespaceUtil.wrapNamespace(consumer.getDefaultMQPushConsumer().getNamespace(), topic);
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(realTopic);
for (MessageQueue queue : messageQueues) {
brokerSet.add(queue.getBrokerName());
}
} catch (MQClientException e) {
CLIENT_LOG.info("fetch message queue failed, the topic is {}", topic);
}
return brokerSet;
}
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
public DefaultMQProducer getTraceProducer() {
return traceProducer;
}
}