will, retain
diff --git a/README.md b/README.md
index 042d93e..eae4ec8 100644
--- a/README.md
+++ b/README.md
@@ -85,7 +85,7 @@
 The mqtt-example module has written basic usage example code, which can be used for reference
 
 ## Protocol Version
-The currently supported protocol version is [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf), but the will and retain features are not supported yet
+The currently supported protocol version is [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf).
 
 ## Authentication
 At present, an implementation based on the HmacSHA1 signature algorithm is provided by default, Refer to **AuthManagerSample**. Users can customize other implementations to meet the needs of businesses to flexibly verify resources and identities.
diff --git a/distribution/conf/meta.conf b/distribution/conf/meta.conf
index aa4cba7..d4ee418 100644
--- a/distribution/conf/meta.conf
+++ b/distribution/conf/meta.conf
@@ -16,4 +16,3 @@
 
 selfAddress=
 membersAddress=
-maxRetainedMessageNum=10000
\ No newline at end of file
diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java
index 3edf003..0f869c0 100644
--- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java
+++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java
@@ -45,7 +45,7 @@
             public void connectComplete(boolean reconnect, String serverURI) {
                 System.out.println(recvClientId + " connect success to " + serverURI);
                 try {
-                    final String topicFilter[] = {firstTopic + "/r3", firstTopic + "/retainTopic1", firstTopic + "/willTopic1",};
+                    final String topicFilter[] = {firstTopic + "/r3", firstTopic + "/retainTopic/+", firstTopic + "/willTopic1",};
                     final int[] qos = {1, 1, 1};
                     mqttClient.subscribe(topicFilter, qos);
                 } catch (Exception e) {
diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java
index 18d4aae..064c1f0 100644
--- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java
+++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java
@@ -68,7 +68,7 @@
             e.printStackTrace();
         }
         long interval = 1000;
-        int c = 10;
+        int c = 3;
         for (int i = 0; i < c; i++) {
             String msg = "r3_" + System.currentTimeMillis() + "_" + i;
             MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
@@ -76,7 +76,7 @@
             String mqttSendTopic = firstTopic + "/r3";
             if (i >= c - 1) {
                 message.setRetained(true);
-                mqttSendTopic = firstTopic + "/retainTopic1";
+                mqttSendTopic = firstTopic + "/retainTopic/1";
             }
             mqttClient.publish(mqttSendTopic, message);
             System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
index 8133773..376ab63 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
@@ -37,9 +37,9 @@
     private int metaPort = 25000;
     private String selfAddress;
     private String membersAddress;
-    private int maxRetainedMessageNum;
+    private int maxRetainedTopicNum =  10000;
     private int electionTimeoutMs = 1000;
-    private int snapshotIntervalSecs = 1000;
+    private int snapshotIntervalSecs = 60 * 1000;
     private String raftServiceName = System.getenv("RaftServiceName");
 
     public MetaConf() throws IOException {
@@ -128,12 +128,12 @@
         this.snapshotIntervalSecs = snapshotIntervalSecs;
     }
 
-    public int getMaxRetainedMessageNum() {
-        return maxRetainedMessageNum;
+    public int getMaxRetainedTopicNum() {
+        return maxRetainedTopicNum;
     }
 
-    public void setMaxRetainedMessageNum(int maxRetainedMessageNum) {
-        this.maxRetainedMessageNum = maxRetainedMessageNum;
+    public void setMaxRetainedTopicNum(int maxRetainedTopicNum) {
+        this.maxRetainedTopicNum = maxRetainedTopicNum;
     }
 
     public String getRaftServiceName() {
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
index fb377e2..5c1b428 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
@@ -111,6 +111,9 @@
                 new LinkedBlockingQueue<>(10000),
                 new ThreadFactoryImpl("requestExecutor_"));
 
+        registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedTopicNum()));  //add retained msg processor
+        registerStateProcessor(new WillMsgStateProcessor(this));
+
         rt = RouteTable.getInstance();
         localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
         rpcServer = createRpcServer(this, localPeerId);
@@ -134,9 +137,6 @@
         CliOptions cliOptions = new CliOptions();
         this.cliService = RaftServiceFactory.createAndInitCliService(cliOptions);
         this.cliClientService = (CliClientServiceImpl) ((CliServiceImpl) this.cliService).getCliClientService();
-
-        registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedMessageNum()));  //add retained msg processor
-        registerStateProcessor(new WillMsgStateProcessor(this));
     }
 
     private void refreshLeader() {
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
index 20ac4e5..b763a29 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
@@ -100,8 +100,7 @@
             }
         } catch (Throwable t) {
             LOGGER.error("stateMachine meet critical error", t);
-            iterator.setErrorAndRollback(index - applied,
-                new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
+            //iterator.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
         }
     }
 
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
index 134f62f..e2c7c23 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
@@ -41,11 +41,11 @@
     private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcessor.class);
     private final ConcurrentHashMap<String, Trie<String, String>> retainedMsgTopicTrie = new ConcurrentHashMap<>();  //key:firstTopic value:retained topic Trie
     private MqttRaftServer server;
-    private int maxRetainedMessageNum;
+    private int maxRetainedTopicNum;
 
-    public RetainedMsgStateProcessor(MqttRaftServer server, int maxRetainedMessageNum) {
+    public RetainedMsgStateProcessor(MqttRaftServer server, int maxRetainedTopicNum) {
         this.server = server;
-        this.maxRetainedMessageNum = maxRetainedMessageNum;
+        this.maxRetainedTopicNum = maxRetainedTopicNum;
     }
 
     @Override
@@ -65,20 +65,21 @@
             if (operation.equals("topic")) {    //return retained msg
                 return get(sm.getRocksDBEngine(), topic.getBytes(StandardCharsets.UTF_8));
             } else { //return retain msgs of matched Topic
-                if (!retainedMsgTopicTrie.containsKey(firstTopic)) {
+                String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
+                if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
                     Trie<String, String> newTrie = new Trie<>();
-                    Response value = get(sm.getRocksDBEngine(), firstTopic.getBytes(StandardCharsets.UTF_8));
+                    Response value = get(sm.getRocksDBEngine(), wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8));
                     if (value != null && value.getData() != null) {
                         newTrie = JSON.parseObject(value.getData().toStringUtf8(), Trie.class);
                     }
-                    retainedMsgTopicTrie.put(firstTopic, newTrie);
+                    retainedMsgTopicTrie.put(wrapTrieFirstTopic, newTrie);
 
                     return Response.newBuilder()
                             .setSuccess(true)
                             .setData(ByteString.copyFrom(JSON.toJSONBytes(new ArrayList<byte[]>())))
                             .build();
                 }
-                Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(firstTopic);
+                Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
                 Set<String> matchTopics = tmpTrie.getAllPath(topic);
 
                 ArrayList<ByteString> msgResults = new ArrayList<>();
@@ -104,29 +105,27 @@
     }
 
     boolean setRetainedMsg(RocksDBEngine rocksDBEngine, String firstTopic, String topic, boolean isEmpty, byte[] msg) throws Exception {
-
+        String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
         // if the trie of firstTopic doesn't exist
-        if (!retainedMsgTopicTrie.containsKey(firstTopic)) {
-            retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(firstTopic), new Trie<String, String>());
+        if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
+            retainedMsgTopicTrie.put(wrapTrieFirstTopic, new Trie<String, String>());
         }
-
         if (isEmpty) {
             //delete from trie
             logger.info("Delete the topic {} retained message", topic);
             delete(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8));
-            Trie<String, String> trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic));
+            Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
             if (trie != null) {
                 trie.deleteTrieNode(topic, "");
             }
-            put(rocksDBEngine, firstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
+            put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
         } else {
             //Add to trie
-            Trie<String, String> trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic));
-            logger.info("maxRetainedMessageNum:{}", maxRetainedMessageNum);
-            if (trie.getNodePath().size() < maxRetainedMessageNum) {
+            Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
+            if (trie.getNodePath().size() < maxRetainedTopicNum) {
                 put(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8), msg);
                 trie.addNode(topic, "", "");
-                put(rocksDBEngine, firstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
+                put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
                 return true;
             } else {
                 return false;
@@ -135,6 +134,10 @@
         return true;
     }
 
+    private String wrapTrieFirstTopic(String firstTopic){
+        return "$"+firstTopic+"$";
+    }
+
     @Override
     public Response onWriteRequest(WriteRequest writeRequest) {
         try {