chore(all):polish the code and modify the version
diff --git a/pom.xml b/pom.xml
index f8acf1d..827ae1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-all</artifactId>
- <version>2.0.5-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ Spring Boot ${project.version}</name>
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index ca490b0..bf62ce4 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-all</artifactId>
- <version>2.0.5-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -38,7 +38,7 @@
<spring.boot.version>2.0.5.RELEASE</spring.boot.version>
<spring.version>5.1.0.RELEASE</spring.version>
- <rocketmq.spring.boot.version>2.0.5-SNAPSHOT</rocketmq.spring.boot.version>
+ <rocketmq.spring.boot.version>2.1.0-SNAPSHOT</rocketmq.spring.boot.version>
<rocketmq-version>4.6.0</rocketmq-version>
<slf4j.version>1.7.25</slf4j.version>
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index 983a2d4..1787afd 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -38,7 +38,7 @@
</modules>
<properties>
- <rocketmq-spring-boot-starter-version>2.0.5-SNAPSHOT</rocketmq-spring-boot-starter-version>
+ <rocketmq-spring-boot-starter-version>2.1.0-SNAPSHOT</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
diff --git a/rocketmq-spring-boot-starter/pom.xml b/rocketmq-spring-boot-starter/pom.xml
index 697ab16..71ea5cd 100644
--- a/rocketmq-spring-boot-starter/pom.xml
+++ b/rocketmq-spring-boot-starter/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-parent</artifactId>
- <version>2.0.5-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
</parent>
@@ -30,7 +30,7 @@
<packaging>jar</packaging>
<name>RocketMQ Spring Boot Starter</name>
- <description>SRocketMQ Spring Boot Starter</description>
+ <description>RocketMQ Spring Boot Starter</description>
<url>https://github.com/apache/rocketmq-spring</url>
<dependencies>
diff --git a/rocketmq-spring-boot/pom.xml b/rocketmq-spring-boot/pom.xml
index d5d5505..cd1d2a4 100644
--- a/rocketmq-spring-boot/pom.xml
+++ b/rocketmq-spring-boot/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-parent</artifactId>
- <version>2.0.5-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
</parent>
@@ -30,7 +30,7 @@
<packaging>jar</packaging>
<name>RocketMQ Spring Boot AutoConfigure</name>
- <description>SRocketMQ Spring Boot AutoConfigure</description>
+ <description>RocketMQ Spring Boot AutoConfigure</description>
<url>https://github.com/apache/rocketmq-spring</url>
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index 1c019bb..b62d20a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -17,19 +17,14 @@
package org.apache.rocketmq.spring.autoconfigure;
-import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -104,7 +99,6 @@
}
private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
- DefaultMQProducer producer = null;
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
if (producerConfig == null) {
@@ -118,35 +112,15 @@
ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
String sk = environment.resolvePlaceholders(annotation.secretKey());
sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
+ boolean isEnableMsgTrace = annotation.enableMsgTrace();
String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;
- boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
-
- if (isEnableAcl) {
- producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
- producer.setVipChannelEnabled(false);
- } else {
- producer = new TransactionMQProducer(groupName);
- }
-
- if (annotation.enableMsgTrace()) {
- try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
- dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
- Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
- field.setAccessible(true);
- field.set(producer, dispatcher);
- producer.getDefaultMQProducerImpl().registerSendMessageHook(
- new SendMessageTraceHookImpl(dispatcher));
- } catch (Throwable e) {
- log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
- }
- }
+ DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
- producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
+ producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 14f7160..e6131e7 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,18 +17,13 @@
package org.apache.rocketmq.spring.autoconfigure;
-import java.lang.reflect.Field;
import javax.annotation.PostConstruct;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -84,30 +79,12 @@
String accessChannel = rocketMQProperties.getAccessChannel();
- DefaultMQProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
- boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
- if (isEnableAcl) {
- producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
- producer.setVipChannelEnabled(false);
- } else {
- producer = new TransactionMQProducer(groupName);
- }
+ boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
+ String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
- if (rocketMQProperties.getProducer().isEnableMsgTrace()) {
- try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(rocketMQProperties.getProducer().getCustomizedTraceTopic(), isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
- dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
- Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
- field.setAccessible(true);
- field.set(producer, dispatcher);
- producer.getDefaultMQProducerImpl().registerSendMessageHook(
- new SendMessageTraceHookImpl(dispatcher));
- } catch (Throwable e) {
- log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
- }
- }
+ DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 832d021..1957389 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -33,7 +33,7 @@
private String nameServer;
/**
- * Enum type for accesChannel, values: LOCAL, CLOUD
+ * Enum type for accessChannel, values: LOCAL, CLOUD
*/
private String accessChannel;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index ad59bc9..ecf2354 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -209,14 +209,14 @@
return selectorType;
}
- public void setSelectorExpression(String selectorExpression) {
- this.selectorExpression = selectorExpression;
- }
-
public String getSelectorExpression() {
return selectorExpression;
}
+ public void setSelectorExpression(String selectorExpression) {
+ this.selectorExpression = selectorExpression;
+ }
+
public MessageModel getMessageModel() {
return messageModel;
}
@@ -296,7 +296,6 @@
return Integer.MAX_VALUE;
}
-
@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
@@ -328,53 +327,6 @@
this.name = name;
}
- public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
-
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(doConvertMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
-
- public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(doConvertMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
- }
- }
-
-
@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class)) {
@@ -402,7 +354,6 @@
}
}
-
private MethodParameter getMethodParameter() {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
Type messageType = this.getMessageType();
@@ -470,7 +421,7 @@
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
- this.applicationContext.getEnvironment().
+ this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
@@ -528,4 +479,50 @@
}
+ public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ rocketMQListener.onMessage(doConvertMessage(messageExt));
+ long costTime = System.currentTimeMillis() - now;
+ log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+ } catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}", messageExt, e);
+ context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }
+
+ public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ rocketMQListener.onMessage(doConvertMessage(messageExt));
+ long costTime = System.currentTimeMillis() - now;
+ log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+ } catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}", messageExt, e);
+ context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+ }
+
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ }
+
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 4c731f2..5352b19 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -17,11 +17,19 @@
package org.apache.rocketmq.spring.support;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Objects;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
@@ -39,10 +47,6 @@
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.Objects;
-
public class RocketMQUtil {
private final static Logger log = LoggerFactory.getLogger(RocketMQUtil.class);
@@ -73,7 +77,7 @@
}
// Never happen
- log.warn("Failed to covert enum type RocketMQLocalTransactionState.%s", state);
+ log.warn("Failed to covert enum type RocketMQLocalTransactionState {}.", state);
return LocalTransactionState.UNKNOW;
}
@@ -136,9 +140,9 @@
byte[] payloads;
if (payloadObj instanceof String) {
- payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+ payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
} else if (payloadObj instanceof byte[]) {
- payloads = (byte[])message.getPayload();
+ payloads = (byte[]) message.getPayload();
} else {
try {
String jsonObj = objectMapper.writeValueAsString(payloadObj);
@@ -150,11 +154,6 @@
return getAndWrapMessage(destination, message.getHeaders(), payloads);
}
- public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
- String destination, org.springframework.messaging.Message<byte[]> message) {
- return getAndWrapMessage(destination, message.getHeaders(), message.getPayload());
- }
-
private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
if (destination == null || destination.length() < 1) {
return null;
@@ -210,11 +209,11 @@
throw new RuntimeException("the message cannot be empty");
}
if (payloadObj instanceof String) {
- payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+ payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
} else if (payloadObj instanceof byte[]) {
- payloads = (byte[])message.getPayload();
+ payloads = (byte[]) message.getPayload();
} else {
- String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
+ String jsonObj = (String) messageConverter.fromMessage(message, payloadObj.getClass());
if (null == jsonObj) {
throw new RuntimeException(String.format(
"empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
@@ -247,11 +246,41 @@
public static String getInstanceName(RPCHook rpcHook, String identify) {
String separator = "|";
StringBuilder instanceName = new StringBuilder();
- SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials();
+ SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook).getSessionCredentials();
instanceName.append(sessionCredentials.getAccessKey())
.append(separator).append(sessionCredentials.getSecretKey())
.append(separator).append(identify)
.append(separator).append(UtilAll.getPid());
return instanceName.toString();
}
+
+ public static DefaultMQProducer createDefaultMQProducer(String groupName, String ak, String sk,
+ boolean isEnableMsgTrace, String customizedTraceTopic) {
+
+ boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
+ DefaultMQProducer producer;
+ if (isEnableAcl) {
+ producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
+ producer.setVipChannelEnabled(false);
+ } else {
+ producer = new TransactionMQProducer(groupName);
+ }
+
+ if (isEnableMsgTrace) {
+ try {
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
+ dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
+ Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
+ field.setAccessible(true);
+ field.set(producer, dispatcher);
+ producer.getDefaultMQProducerImpl().registerSendMessageHook(
+ new SendMessageTraceHookImpl(dispatcher));
+ } catch (Throwable e) {
+ log.error("system trace hook init failed ,maybe can't send msg trace data");
+ }
+ }
+
+ return producer;
+ }
+
}