Merge pull request #49 from fangjian0423/header-conversion!

[ISSUE-41] Optimize header conversion
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java
index e21aeab..7afefbd 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQHeaders.java
@@ -20,6 +20,7 @@
  * Represents the RocketMQ message protocol that is used during the data exchange.
  */
 public class RocketMQHeaders {
+    public static final String PREFIX = "rocketmq_";
     public static final String KEYS = "KEYS";
     public static final String TAGS = "TAGS";
     public static final String TOPIC = "TOPIC";
@@ -30,5 +31,4 @@
     public static final String QUEUE_ID = "QUEUE_ID";
     public static final String SYS_FLAG = "SYS_FLAG";
     public static final String TRANSACTION_ID = "TRANSACTION_ID";
-    public static final String PROPERTIES = "PROPERTIES";
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 16a6bcf..dc35413 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -23,6 +23,7 @@
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
@@ -31,10 +32,12 @@
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.MessagingException;
 import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.nio.charset.Charset;
+import java.util.Map;
 import java.util.Objects;
 
 public class RocketMQUtil {
@@ -77,37 +80,48 @@
 
     public static org.springframework.messaging.Message convertToSpringMessage(
         org.apache.rocketmq.common.message.MessageExt message) {
-        org.springframework.messaging.Message retMessage =
+        MessageBuilder messageBuilder =
             MessageBuilder.withPayload(message.getBody()).
-                setHeader(RocketMQHeaders.KEYS, message.getKeys()).
-                setHeader(RocketMQHeaders.TAGS, message.getTags()).
-                setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
-                setHeader(RocketMQHeaders.MESSAGE_ID, message.getMsgId()).
-                setHeader(RocketMQHeaders.BORN_TIMESTAMP, message.getBornTimestamp()).
-                setHeader(RocketMQHeaders.BORN_HOST, message.getBornHostString()).
-                setHeader(RocketMQHeaders.FLAG, message.getFlag()).
-                setHeader(RocketMQHeaders.QUEUE_ID, message.getQueueId()).
-                setHeader(RocketMQHeaders.SYS_FLAG, message.getSysFlag()).
-                setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
-                setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
-                build();
+                setHeader(toRocketHeaderKey(RocketMQHeaders.KEYS), message.getKeys()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.TAGS), message.getTags()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.TOPIC), message.getTopic()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.MESSAGE_ID), message.getMsgId()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.BORN_TIMESTAMP), message.getBornTimestamp()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.BORN_HOST), message.getBornHostString()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.FLAG), message.getFlag()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.QUEUE_ID), message.getQueueId()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.SYS_FLAG), message.getSysFlag()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID), message.getTransactionId());
+        addUserProperties(message.getProperties(), messageBuilder);
+        return messageBuilder.build();
+    }
 
-        return retMessage;
+    public static String toRocketHeaderKey(String rawKey) {
+        return RocketMQHeaders.PREFIX + rawKey;
+    }
+
+    private static void addUserProperties(Map<String, String> properties, MessageBuilder messageBuilder) {
+        if (!CollectionUtils.isEmpty(properties)) {
+            properties.forEach((key, val) -> {
+                if (!MessageConst.STRING_HASH_SET.contains(key) && !MessageHeaders.ID.equals(key)
+                    && !MessageHeaders.TIMESTAMP.equals(key)) {
+                    messageBuilder.setHeader(key, val);
+                }
+            });
+        }
     }
 
     public static org.springframework.messaging.Message convertToSpringMessage(
         org.apache.rocketmq.common.message.Message message) {
-        org.springframework.messaging.Message retMessage =
+        MessageBuilder messageBuilder =
             MessageBuilder.withPayload(message.getBody()).
-                setHeader(RocketMQHeaders.KEYS, message.getKeys()).
-                setHeader(RocketMQHeaders.TAGS, message.getTags()).
-                setHeader(RocketMQHeaders.TOPIC, message.getTopic()).
-                setHeader(RocketMQHeaders.FLAG, message.getFlag()).
-                setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()).
-                setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()).
-                build();
-
-        return retMessage;
+                setHeader(toRocketHeaderKey(RocketMQHeaders.KEYS), message.getKeys()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.TAGS), message.getTags()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.TOPIC), message.getTopic()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.FLAG), message.getFlag()).
+                setHeader(toRocketHeaderKey(RocketMQHeaders.TRANSACTION_ID), message.getTransactionId());
+        addUserProperties(message.getProperties(), messageBuilder);
+        return messageBuilder.build();
     }
 
     public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
@@ -160,11 +174,12 @@
             rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
 
             headers.entrySet().stream()
-                .filter(entry -> !Objects.equals(entry.getKey(), RocketMQHeaders.KEYS)
-                    && !Objects.equals(entry.getKey(), "FLAG")
-                    && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK"
+                .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
+                    && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
                 .forEach(entry -> {
-                    rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_"
+                    if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
+                        rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
+                    }
                 });
 
         }
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
index bd9e94b..3704802 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
@@ -23,6 +23,8 @@
 import org.springframework.messaging.Message;
 import org.springframework.messaging.support.MessageBuilder;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class RocketMQUtilTest {
@@ -56,4 +58,36 @@
         assertTrue(Arrays.equals((byte[])msgWithBytePayload.getPayload(), rocketMsg2.getBody()));
     }
 
+    @Test
+    public void testHeaderConvertToRMQMsg() {
+        Message msgWithStringPayload = MessageBuilder.withPayload("test body")
+            .setHeader("test", 1)
+            .setHeader(RocketMQHeaders.TAGS, "tags")
+            .setHeader(RocketMQHeaders.KEYS, "my_keys")
+            .build();
+        org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
+            "UTF-8", "test-topic", msgWithStringPayload);
+        assertEquals(String.valueOf("1"), rocketMsg.getProperty("test"));
+        assertNull(rocketMsg.getProperty(RocketMQHeaders.TAGS));
+        assertEquals("my_keys", rocketMsg.getProperty(RocketMQHeaders.KEYS));
+    }
+
+    @Test
+    public void testHeaderConvertToSpringMsg() {
+        org.apache.rocketmq.common.message.Message rmqMsg = new org.apache.rocketmq.common.message.Message();
+        rmqMsg.setBody("test body".getBytes());
+        rmqMsg.setTopic("test-topic");
+        rmqMsg.putUserProperty("test", "1");
+        rmqMsg.setTags("tags");
+        Message springMsg = RocketMQUtil.convertToSpringMessage(rmqMsg);
+        assertEquals(String.valueOf("1"), springMsg.getHeaders().get("test"));
+        assertEquals("tags", springMsg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
+
+        org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
+            "UTF-8", "test-topic", springMsg);
+        assertEquals(String.valueOf("1"), rocketMsg.getProperty("test"));
+        assertEquals(String.valueOf("tags"), rocketMsg.getProperty(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
+        assertNull(rocketMsg.getTags());
+    }
+
 }
\ No newline at end of file