[ISSUE #33] Add 6 features according to original message test classes to bdd resource (#34)

* features

* Delete ConsumerGroup.feature

* features

* Delete bdd/src/main/resources/consumer directory

* features

* features

---------

Co-authored-by: alani <ninan.ni@alibaba-inc.com>
diff --git a/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java b/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
index c7bdc2d..021186c 100644
--- a/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
+++ b/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
@@ -23,9 +23,6 @@
 import io.cucumber.java.en.When;
 
 public class ClientInitStepdefs {
-    @And("Shutdown the producer and consumer")
-    public void shutdownTheProducerAndConsumer() {
-    }
 
     @And("Create a Producer, set the <NameServer>, <RequestTimeout>")
     public void createAProducerSetTheNameServerRequestTimeout() {
@@ -46,11 +43,7 @@
 
     @And("Send {string} messages {string}")
     public void sendMessages(String arg0, String arg1) {
-
-    }
-
-    @And("Shutdown the producer")
-    public void shutdownTheProducer() {
+        
 
     }
 
@@ -90,8 +83,9 @@
 
     }
 
-    @When("Create a PushConsumer, set the Endpoint\\({string}), ConsumerGroup\\({string}), SubscriptionExpression\\({string}), Topic\\({string}), MessageListener\\({string})")
-    public void createAPushConsumerSetTheEndpointConsumerGroupSubscriptionExpressionTopicMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4) {
+    @When("Create a PushConsumer, set the Endpoint\\({string}), ConsumerGroup\\({string}), Tag\\({string}), Topic\\({string}), MessageListener\\({string})")
+    public void createAPushConsumerSetTheEndpointConsumerGroupTagTopicMessageListener(String arg0, String arg1, String arg2, String arg3, String arg4) {
+        
 
     }
 
@@ -100,13 +94,15 @@
 
     }
 
-    @Then("Create a message, including the Topic\\({string}), SubscriptionExpression\\({string}), Key\\({string}), and Body\\({string})")
-    public void createAMessageIncludingTheTopicSubscriptionExpressionKeyAndBody(String arg0, String arg1, String arg2, String arg3) {
+    @Then("Create a message, including the Topic\\({string}), Tag\\({string}), Key\\({string}), and Body\\({string})")
+    public void createAMessageIncludingTheTopicTagKeyAndBody(String arg0, String arg1, String arg2, String arg3) {
+        
 
     }
 
     @And("Create a Producer, set the Endpoint\\({string}), RequestTimeout:\\({string}), Topic\\({string})")
     public void createAProducerSetTheEndpointRequestTimeoutTopic(String arg0, String arg1, String arg2) {
+        
 
     }
 
@@ -114,4 +110,112 @@
     public void checkSendMessageFailed() {
 
     }
+
+    @And("Check each MessageGroup consumes up to {int} messages separately and is consumed orderly")
+    public void checkEachMessageGroupConsumesUpToMessagesSeparatelyAndIsConsumedOrderly(int arg0) {
+    }
+    
+
+    @And("Create a SimpleConsumer, set the Endpoint\\({string}), ConsumerGroup\\({string}), SubscriptionExpression\\({string}), Topic\\({string}), Duration\\({string})")
+    public void createASimpleConsumerSetTheEndpointConsumerGroupSubscriptionExpressionTopicDuration(String arg0, String arg1, String arg2, String arg3, String arg4) {
+    }
+
+    @And("Check the subscribed message body is equal to {string}")
+    public void checkTheSubscribedMessageBodyEqualsTo(String arg0) {
+    }
+
+    @And("Create a message, including the Topic\\({string}), Tag\\({string}), Key\\({string}), and Body\\(null)")
+    public void createAMessageIncludingTheTopicTagKeyAndBodyNull(String arg0, String arg1, String arg2) {
+    }
+
+
+    @And("Create a message, including the Topic\\(null), Tag\\({string}), Key\\({string}), and Body\\({string})")
+    public void createAMessageIncludingTheTopicNullTagKeyAndBody(String arg0, String arg1, String arg2) {
+        
+    }
+
+    @And("Create a message, including the Topic\\({string}), Tag\\(null), Key\\({string}), and Body\\({string})")
+    public void createAMessageIncludingTheTopicTagNullKeyAndBody(String arg0, String arg1, String arg2) {
+
+    }
+
+    @And("Create a message, including the Topic\\({string}), SubscriptionExpression\\({string}), Key\\(RandomStringUtils.randomAlphabetic\\({int} * {int} + {int})), and Body\\({string})")
+    public void createAMessageIncludingTheTopicSubscriptionExpressionKeyRandomStringUtilsRandomAlphabeticAndBody(String arg0, String arg1, int arg2, int arg3, int arg4, String arg5) {
+
+    }
+
+    @Then("Check exceptions can be thrown")
+    public void checkBuildMethodThatCanThrowExceptions() {
+    }
+
+    @Given("Create a {string} topic:{string} if not exist")
+    public void createATopicIfNotExist(String arg0, String arg1) {
+
+
+    }
+
+    @And("Shutdown the producer and consumer if they are started")
+    public void shutdownTheProducerAndConsumerIfTheyAreStarted() {
+    }
+
+    @And("Create a message, including the Topic\\({string}), Tag\\({string}), Key\\({string}, {string}), and Body\\({string})")
+    public void createAMessageIncludingTheTopicTagKeyAndBody(String arg0, String arg1, String arg2, String arg3, String arg4) {
+        
+    }
+
+    @And("Send a half message")
+    public void sendAHalfMessage() {
+        
+    }
+
+    @And("Create a transaction branch")
+    public void createATransactionBranch() {
+    }
+
+    @And("Create a message, including the Topic\\({string}), Tag\\({string}), and Body\\({string})")
+    public void createAMessageIncludingTheTopicTagAndBody(String arg0, String arg1, String arg2) {
+        
+    }
+
+    @And("Create a message, including the Topic\\({string}), Tag\\({string}), Body\\({string}), deliveryTimestamp\\({string})")
+    public void createAMessageIncludingTheTopicTagBodyDeliveryTimestamp(String arg0, String arg1, String arg2, String arg3) {
+        
+    }
+
+    @And("Create a message, including the Topic\\({string}), Tag\\({string}), Body\\({string}), messageGroup\\({string})")
+    public void createAMessageIncludingTheTopicTagBodyMessageGroup(String arg0, String arg1, String arg2, String arg3) {
+        
+    }
+
+    @And("Create a message, including the Topic\\({string}), Tag\\({string}), Key\\({string}), Value\\({string}), Body\\({string}), msgKey\\({string})")
+    public void createAMessageIncludingTheTopicTagKeyValueBodyMsgKey(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) {
+
+    }
+
+    @And("Create a message, including the Topic\\({string}), and messageProperty\\({string}, {string})")
+    public void createAMessageIncludingTheTopicAndMessageProperty(String arg0, String arg1, String arg2) {
+    }
+
+    @And("Create a message, including the Topic\\({string}), Body\\({string}), and messageProperty\\({string})")
+    public void createAMessageIncludingTheTopicBodyAndMessageProperty(String arg0, String arg1, String arg2) {
+        
+    }
+
+    @And("Create a message, including the Topic\\({string}), Body\\({string}), messageGroup\\({string}), and messageProperty\\({string})")
+    public void createAMessageIncludingTheTopicBodyMessageGroupAndMessageProperty(String arg0, String arg1, String arg2, String arg3) {
+    }
+
+    @And("Set message {string} {string} times")
+    public void setMessageTimes(String arg0, String arg1) {
+        
+    }
+
+    @And("Set Key\\({string}), Value\\({string})")
+    public void setKeyValue(String arg0, String arg1) {
+
+    }
+
+    @And("Set messageProperty {string} to {string} and {string} to {string}")
+    public void setMessagePropertyToAndTo(String arg0, String arg1, String arg2, String arg3) {
+    }
 }
diff --git a/bdd/src/main/resources/delay.feature b/bdd/src/main/resources/delay.feature
index 511403a..7842fbe 100644
--- a/bdd/src/main/resources/delay.feature
+++ b/bdd/src/main/resources/delay.feature
@@ -17,32 +17,32 @@
 
   Scenario:  Send 10 messages set delivery timestamp after 30s after the current system time. Expect to consume all 10 messages after 30s
     Given Create a "DELAY" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
-    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpression("TagA"), Topic("random-topic"), MessageListener("default")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
     And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
-    Then Create a message, including the Topic("random-topic"), SubscriptionExpression("TagA"), Key("Key"), and Body("Body")
+    Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
     And Set message "DeliveryTimestamp" to 20 seconds before the current system time
     And  Send "10" messages "synchronous"
     Then Check all messages that can be consumed within 60s
     And Check the received message's "DeliveryTimestamp" property "isNotNull" and value is expected
     And Check consume all messages immediately
-    And Shutdown the producer and consumer
+    And Shutdown the producer and consumer if they are started
 
   Scenario:  Send 10 messages set delivery timestamp 10s before the current system time. Expect timing does not take effect, all 10 messages can be consumed immediately
     Given Create a "DELAY" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
-    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpression("TagA"), Topic("random-topic"), MessageListener("default")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
     And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
-    Then Create a message, including the Topic("random-topic"), SubscriptionExpression("TagA"), Key("Key"), and Body("Body")
+    Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
     And Set message "DeliveryTimestamp" to 30 seconds after the current system time
     And  Send "10" messages "synchronous"
     Then Check consume all 10 messages after 30s(±5s)
     And Check the received message's "DeliveryTimestamp" property "isNotNull" and value is expected
-    And Shutdown the producer and consumer
+    And Shutdown the producer and consumer if they are started
 
   Scenario:  Send a message set delivery timestamp 24h+5s after the current system time. Expect send message failed
     Given Create a "DELAY" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
     And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
-    Then Create a message, including the Topic("random-topic"), SubscriptionExpression("TagA"), Key("Key"), and Body("Body")
+    Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
     And Set message "DeliveryTimestamp" to 86405 seconds after the current system time
     And  Send "1" messages "synchronous"
     Then Check send message failed
-    And Shutdown the producer
\ No newline at end of file
+    And Shutdown the producer and consumer if they are started
\ No newline at end of file
diff --git a/bdd/src/main/resources/message/MessageBodyContent.feature b/bdd/src/main/resources/message/MessageBodyContent.feature
new file mode 100644
index 0000000..1d98e97
--- /dev/null
+++ b/bdd/src/main/resources/message/MessageBodyContent.feature
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+Feature: Test message body contents
+
+  Scenario Outline: Send normal message, setting message body, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("<MessageBodyContent>")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Check the subscribed message body is equal to "<MessageBodyContent>"
+    And Shutdown the producer and consumer if they are started
+
+    Examples:
+      | MessageBodyContent |
+      |                    |
+      |       中文字符       |
+      |         😱         |
+
+
+
+
diff --git a/bdd/src/main/resources/message/MessageKey.feature b/bdd/src/main/resources/message/MessageKey.feature
new file mode 100644
index 0000000..6345a0a
--- /dev/null
+++ b/bdd/src/main/resources/message/MessageKey.feature
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+Feature: Test message key
+
+  Scenario: Message Key beyond 16KB, expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Key("size:16kB+1"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message Key beyond 16KB, expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Key("\u0000"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message Key equals 16KB, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Key("size:16KB"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message key contains Chinese, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Key("中文字符"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: The message contains multiple keys, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Key("random-key1", "random-key2"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+
diff --git a/bdd/src/main/resources/message/MessageProperties.feature b/bdd/src/main/resources/message/MessageProperties.feature
new file mode 100644
index 0000000..106c935
--- /dev/null
+++ b/bdd/src/main/resources/message/MessageProperties.feature
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+Feature: Test message properties
+
+  Scenario: producer invoke send(messageBody=null), expect build message throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Key("Key"), and Body(null)
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: producer invoke send(topic=""), expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic(""), Tag("random-tag"), Key("Key"), and Body("Body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: producer invoke send(topic=null), expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic(null), Tag("random-tag"), Key("Key"), and Body("Body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: producer invoke send(tag=null), expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag(null), Key("Key"), and Body("Body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: producer invoke send(tag=""), expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag(" "), Key("Key"), and Body("Body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
diff --git a/bdd/src/main/resources/message/MessageSize.feature b/bdd/src/main/resources/message/MessageSize.feature
new file mode 100644
index 0000000..7960d34
--- /dev/null
+++ b/bdd/src/main/resources/message/MessageSize.feature
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+Feature: Test message size
+
+  Scenario Outline: Send normal/transaction messages with the body size of 4M+1, expect send failed
+    Given Create a "<MessageType>" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), and Body("size:4M+1")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+    Examples:
+      | MessageType   |
+      |   Normal      |
+      | Transaction   |
+
+  Scenario: Send delay messages with the body size of 4M+1, expect send failed
+    Given Create a "delay" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Body("size:4M+1"), deliveryTimestamp("10L")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Send FIFO messages with the body size of 4M+1, expect send failed
+    Given Create a "FIFO" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Body("size:4M+1"), messageGroup("a")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario Outline: Send normal/transaction messages with the body size of 4M, expect send and consume success
+    Given Create a "<MessageType>" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), and Body("size:4M")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+    Examples:
+      | MessageType   |
+      |   Normal      |
+      | Transaction   |
+
+  Scenario: Send delay messages with the body size of 4M, expect send and consume success
+    Given Create a "delay" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Body("size:4M"), deliveryTimestamp("10s")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Send FIFO messages with the body size of 4M, expect send and consume success
+    Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Body("size:4M"), messageGroup("a")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Send normal messages with the body size of 4M and the property size of 16KB, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Body("size:4M"), and messageProperty("size:16kB")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Send FIFO messages with the body size of 4M and the property size of 16KB, expect send and consume success
+    Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Body("size:4M"), messageGroup("a"), and messageProperty("size:16kB")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+
+
+
+
+
+
+
+
+
diff --git a/bdd/src/main/resources/message/MessageTag.feature b/bdd/src/main/resources/message/MessageTag.feature
new file mode 100644
index 0000000..7c1692d
--- /dev/null
+++ b/bdd/src/main/resources/message/MessageTag.feature
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+Feature: Test message tag
+
+  Scenario: Message Tag beyond 16KB, expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("size:16kB+1"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message Tag equals 16KB, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("size:16kB"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message Tag contains invisible characters \u0000, expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("\u0000"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message Tag contains | , expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("tag|"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message Tag contains Chinese, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("中文字符"), and Body("random-body")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+
+
+
diff --git a/bdd/src/main/resources/message/MessageUserProperty.feature b/bdd/src/main/resources/message/MessageUserProperty.feature
new file mode 100644
index 0000000..8828e00
--- /dev/null
+++ b/bdd/src/main/resources/message/MessageUserProperty.feature
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+Feature: Test message property
+
+  Scenario: Message property beyond limit 128 ,expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Body("random-body"), and messageProperty("random-messageProperty")
+    And Set message "messageProperty" "129" times
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: The number of message properties equals limit 128, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Body("random-body"), and messageProperty("random-messageProperty")
+    And Set message "messageProperty" "128" times
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message property equals 16KB, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Body("random-body"), and messageProperty("size:16kB")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+
+  Scenario: Message property beyond 16KB, expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Body("random-body"), and messageProperty("size:16kB+1")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario Outline: Message property contains invisible character \u0000 / use SystemKey UNIQ_KEY ,expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), and messageProperty("<KeyContent>", "<ValueContent>")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+    Examples:
+      | KeyContent | ValueContent |
+      | \u0000     | value        |
+      | UNIQ_KEY   | value        |
+
+  Scenario: Message property ,key and tag beyond 16KB ,expect throw exception
+    Given Create a "Normal" topic:"random-topic" if not exist
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("size:4kB"), Key("size:4kB"), Value("size:4kB"), Body("size:4M"), msgKey("size:4kB+1")
+    And  Send "a" messages "synchronous"
+    Then Check exceptions can be thrown
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message property ,key and tag equals 16KB, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("size:4kB"), Key("size:4kB"), Value("size:4kB"), Body("size:4M"), msgKey("size:4kB")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message property ,key and tag equals 64B, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a message, including the Topic("random-topic"), Tag("size:64B"), Key("size:64B"), Value("size:64B"), Body("size:64B"), msgKey("size:64B")
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+  Scenario: Message property is the visible character, expect send and consume success
+    Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
+    When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
+    And Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
+    And Create a message, including the Topic("random-topic"), Tag("random-tag"), Key("中文"), Value("中文"), Body("random-body"), msgKey("random-msgkey")
+    And Set messageProperty "Key" to "_" and "Value" to "_"
+    And Set messageProperty "Key" to "%" and "Value" to "%"
+    And Set messageProperty "Key" to "。" and "Value" to "。"
+    And Set messageProperty "Key" to "|" and "Value" to "|"
+    And Set messageProperty "Key" to "&&" and "Value" to "&&"
+    And Set messageProperty "Key" to "🏷" and "Value" to "🏷"
+    And  Send "a" messages "synchronous"
+    Then  Check all messages that can be consumed within 60s
+    And Shutdown the producer and consumer if they are started
+
+
+
+
+
+
+
diff --git a/bdd/src/main/resources/normal.feature b/bdd/src/main/resources/normal.feature
index 550e7b0..0ba6c3d 100644
--- a/bdd/src/main/resources/normal.feature
+++ b/bdd/src/main/resources/normal.feature
@@ -17,12 +17,12 @@
 
   Scenario Outline:  10 normal messages are sent synchronously and are expected to be received
     Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
-    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpression("TagA"), Topic("random-topic"), MessageListener("default")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
     And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
-    Then Create a message, including the Topic("random-topic"), SubscriptionExpression("TagA"), Key("Key"), and Body("Body")
+    Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
     And  Send "10" messages "<TransmissionMode>"
     Then Check all messages that can be consumed within 60s
-    And Shutdown the producer and consumer
+    And Shutdown the producer and consumer if they are started
 
     Examples:
       | TransmissionMode |
diff --git a/bdd/src/main/resources/order.feature b/bdd/src/main/resources/order.feature
index e1e12c4..a224336 100644
--- a/bdd/src/main/resources/order.feature
+++ b/bdd/src/main/resources/order.feature
@@ -17,10 +17,10 @@
 
   Scenario:  Send 100 messages with in 2 MessageGroup, expect consumed all messages orderly
     Given Create a "FIFO" topic:"random-topic" if not exist, a "Orderly" group:"random-group"
-    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpression("TagA"), Topic("random-topic"), MessageListener("default")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
     And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
-    Then Create a message, including the Topic("random-topic"), SubscriptionExpression("TagA"), Key("Key"), and Body("Body")
+    Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
     And  A total of 100 messages are sent "<TransmissionMode>" to the 2 MessageGroups in turn
     Then  Check all messages that can be consumed within 60s
     And Check each MessageGroup consumes up to 50 messages separately and is consumed orderly
-    And Shutdown the producer and consumer
+    And Shutdown the producer and consumer if they are started
diff --git a/bdd/src/main/resources/transaction.feature b/bdd/src/main/resources/transaction.feature
index 733f601..f4e13eb 100644
--- a/bdd/src/main/resources/transaction.feature
+++ b/bdd/src/main/resources/transaction.feature
@@ -17,15 +17,15 @@
 
   Scenario Outline:  10 transaction messages are sent synchronously and are expected to be received
     Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
-    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpression("TagA"), Topic("random-topic"), MessageListener("default")
+    When Create a PushConsumer, set the Endpoint("127.0.0.1:9876"), ConsumerGroup("random-group"), Tag("TagA"), Topic("random-topic"), MessageListener("default")
     And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic"), TransactionChecker:("<TransactionChecker>")
     And  Create a transaction branch
-    Then Create a message, including the Topic("random-topic"), SubscriptionExpression("TagA"), Key("Key"), and Body("Body")
+    Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
     And  Send a half message
     And  Execute transaction:"<TransactionExecutor>"
     Then Check all messages send "success"
     And Check all messages that can be consumed within 60s
-    And Shutdown the producer and consumer
+    And Shutdown the producer and consumer if they are started
 
     Examples:
       | TransactionChecker | TransactionExecutor |
diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/MessageTagTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/MessageTagTest.java
index 07e6c2a..594a1e0 100644
--- a/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/MessageTagTest.java
+++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/MessageTagTest.java
@@ -112,20 +112,6 @@
     }
 
     @Test
-    @DisplayName("Message Tag and User Property beyond 16KB ,expect throw exception")
-    public void testMessageUserPropertyAndTagBeyond16KB() {
-        producer = ProducerFactory.getRMQProducer(account, topic);
-        String tag = RandomStringUtils.randomAlphabetic(16 * 1024 + 1);
-        String body = RandomStringUtils.randomAlphabetic(64);
-
-        Assertions.assertNotNull(producer);
-        assertThrows(Exception.class, () -> {
-            Message message = MessageFactory.buildMessage(topic, tag, body);
-            producer.getProducer().send(message);
-        }, " message tag and user property beyond 16KB ,expect throw exception but it didn't");
-    }
-
-    @Test
     @DisplayName("Message Tag contains | , expect throw exception")
     public void testMessageTagContentWith() {
         String tag = "tag|";