Merge pull request #49 from fangjian0423/header-conversion!
[ISSUE-41] Optimize header conversion
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java
index e21aeab..7afefbd 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java
@@ -20,6 +20,7 @@
* Represents the RocketMQ message protocol that is used during the data exchange.
*/
public class RocketMQHeaders {
+ public static final String PREFIX = "rocketmq_";
public static final String KEYS = "KEYS";
public static final String TAGS = "TAGS";
public static final String TOPIC = "TOPIC";
@@ -30,5 +31,4 @@
public static final String QUEUE_ID = "QUEUE_ID";
public static final String SYS_FLAG = "SYS_FLAG";
public static final String TRANSACTION_ID = "TRANSACTION_ID";
- public static final String PROPERTIES = "PROPERTIES";
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 16a6bcf..dc35413 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -23,6 +23,7 @@
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
@@ -31,10 +32,12 @@
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
+import java.util.Map;
import java.util.Objects;
public class RocketMQUtil {
@@ -77,37 +80,48 @@
public static org.springframework.messaging.Message convertToSpringMessage(
org.apache.rocketmq.common.message.MessageExt message) {
- org.springframework.messaging.Message retMessage =
+ MessageBuilder messageBuilder =
MessageBuilder.withPayload(message.getBody()).
- setHeader(RocketMQHeaders.KEYS, message.getKeys()).
- setHeader(RocketMQHeaders.TAGS, message.getTags()).
- setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
- setHeader(RocketMQHeaders.MESSAGE_ID, message.getMsgId()).
- setHeader(RocketMQHeaders.BORN_TIMESTAMP, message.getBornTimestamp()).
- setHeader(RocketMQHeaders.BORN_HOST, message.getBornHostString()).
- setHeader(RocketMQHeaders.FLAG, message.getFlag()).
- setHeader(RocketMQHeaders.QUEUE_ID, message.getQueueId()).
- setHeader(RocketMQHeaders.SYS_FLAG, message.getSysFlag()).
- setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
- setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
- build();
+ setHeader(toRocketHeaderKey(RocketMQHeaders.KEYS), message.getKeys()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.TAGS), message.getTags()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.TOPIC), message.getTopic()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.MESSAGE_ID), message.getMsgId()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.BORN_TIMESTAMP), message.getBornTimestamp()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.BORN_HOST), message.getBornHostString()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.FLAG), message.getFlag()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.QUEUE_ID), message.getQueueId()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.SYS_FLAG), message.getSysFlag()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID), message.getTransactionId());
+ addUserProperties(message.getProperties(), messageBuilder);
+ return messageBuilder.build();
+ }
- return retMessage;
+ public static String toRocketHeaderKey(String rawKey) {
+ return RocketMQHeaders.PREFIX + rawKey;
+ }
+
+ private static void addUserProperties(Map<String, String> properties, MessageBuilder messageBuilder) {
+ if (!CollectionUtils.isEmpty(properties)) {
+ properties.forEach((key, val) -> {
+ if (!MessageConst.STRING_HASH_SET.contains(key) && !MessageHeaders.ID.equals(key)
+ && !MessageHeaders.TIMESTAMP.equals(key)) {
+ messageBuilder.setHeader(key, val);
+ }
+ });
+ }
}
public static org.springframework.messaging.Message convertToSpringMessage(
org.apache.rocketmq.common.message.Message message) {
- org.springframework.messaging.Message retMessage =
+ MessageBuilder messageBuilder =
MessageBuilder.withPayload(message.getBody()).
- setHeader(RocketMQHeaders.KEYS, message.getKeys()).
- setHeader(RocketMQHeaders.TAGS, message.getTags()).
- setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
- setHeader(RocketMQHeaders.FLAG, message.getFlag()).
- setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
- setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
- build();
-
- return retMessage;
+ setHeader(toRocketHeaderKey(RocketMQHeaders.KEYS), message.getKeys()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.TAGS), message.getTags()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.TOPIC), message.getTopic()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.FLAG), message.getFlag()).
+ setHeader(toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID), message.getTransactionId());
+ addUserProperties(message.getProperties(), messageBuilder);
+ return messageBuilder.build();
}
public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
@@ -160,11 +174,12 @@
rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
headers.entrySet().stream()
- .filter(entry -> !Objects.equals(entry.getKey(), RocketMQHeaders.KEYS)
- && !Objects.equals(entry.getKey(), "FLAG")
- && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
+ .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
+ && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
.forEach(entry -> {
- rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
+ if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
+ rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
+ }
});
}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
index bd9e94b..3704802 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
@@ -23,6 +23,8 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class RocketMQUtilTest {
@@ -56,4 +58,36 @@
assertTrue(Arrays.equals((byte[])msgWithBytePayload.getPayload(), rocketMsg2.getBody()));
}
+ @Test
+ public void testHeaderConvertToRMQMsg() {
+ Message msgWithStringPayload = MessageBuilder.withPayload("test body")
+ .setHeader("test", 1)
+ .setHeader(RocketMQHeaders.TAGS, "tags")
+ .setHeader(RocketMQHeaders.KEYS, "my_keys")
+ .build();
+ org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
+ "UTF-8", "test-topic", msgWithStringPayload);
+ assertEquals(String.valueOf("1"), rocketMsg.getProperty("test"));
+ assertNull(rocketMsg.getProperty(RocketMQHeaders.TAGS));
+ assertEquals("my_keys", rocketMsg.getProperty(RocketMQHeaders.KEYS));
+ }
+
+ @Test
+ public void testHeaderConvertToSpringMsg() {
+ org.apache.rocketmq.common.message.Message rmqMsg = new org.apache.rocketmq.common.message.Message();
+ rmqMsg.setBody("test body".getBytes());
+ rmqMsg.setTopic("test-topic");
+ rmqMsg.putUserProperty("test", "1");
+ rmqMsg.setTags("tags");
+ Message springMsg = RocketMQUtil.convertToSpringMessage(rmqMsg);
+ assertEquals(String.valueOf("1"), springMsg.getHeaders().get("test"));
+ assertEquals("tags", springMsg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
+
+ org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
+ "UTF-8", "test-topic", springMsg);
+ assertEquals(String.valueOf("1"), rocketMsg.getProperty("test"));
+ assertEquals(String.valueOf("tags"), rocketMsg.getProperty(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
+ assertNull(rocketMsg.getTags());
+ }
+
}
\ No newline at end of file