Refactor:will, retain
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index fc07cbd..cd59680 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -20,23 +20,19 @@
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
 import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
 import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.hook.HookResult;
 import org.apache.rocketmq.mqtt.common.meta.IpUtil;
 import org.apache.rocketmq.mqtt.common.model.Constants;
-import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
 import org.apache.rocketmq.mqtt.common.model.PullResult;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.QueueOffset;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
 import org.apache.rocketmq.mqtt.common.model.WillMessage;
-import org.apache.rocketmq.mqtt.common.util.MessageUtil;
 import org.apache.rocketmq.mqtt.common.util.SpringUtils;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
@@ -54,7 +50,6 @@
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -115,8 +110,6 @@
     private ScheduledThreadPoolExecutor persistOffsetScheduler;
     private SubscriptionPersistManager subscriptionPersistManager;
 
-    private ScheduledThreadPoolExecutor aliveService;
-    private ThreadPoolExecutor executor;
 
     /**
      * channelId->session
@@ -129,26 +122,13 @@
     private AtomicLong rid = new AtomicLong();
     private long pullIntervalMillis = 10;
 
-    private long checkAliveIntervalMillis = 5 * 1000;
-
     @PostConstruct
     public void init() {
         pullService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("pull_message_thread_"));
-        aliveService = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("check_alive_thread_"));
         scheduler = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("loop_scheduler_"));
         persistOffsetScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("persistOffset_scheduler_"));
         persistOffsetScheduler.scheduleWithFixedDelay(() -> persistAllOffset(true), 5000, 5000, TimeUnit.MILLISECONDS);
         pullService.scheduleWithFixedDelay(() -> pullLoop(), pullIntervalMillis, pullIntervalMillis, TimeUnit.MILLISECONDS);
-        aliveService.scheduleWithFixedDelay(() -> csLoop(), 15 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
-        aliveService.scheduleWithFixedDelay(() -> masterLoop(), 10 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
-
-        executor = new ThreadPoolExecutor(
-                1,
-                1,
-                1,
-                TimeUnit.MINUTES,
-                new LinkedBlockingQueue<>(5000),
-                new ThreadFactoryImpl("DispatchWillToMQ_ "));
     }
 
     private void pullLoop() {
@@ -174,168 +154,6 @@
         }
     }
 
-    private void csLoop() {
-        try {
-            String ip = IpUtil.getLocalAddressCompatible();
-            String csKey = Constants.CS_ALIVE + Constants.CTRL_1 + ip;
-            String masterKey = Constants.CS_MASTER;
-            long currentTime = System.currentTimeMillis();
-
-            willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
-                if (result == null || throwable != null) {
-                    logger.error("{} fail to put csKey", csKey, throwable);
-                }
-            });
-
-            willMsgPersistManager.get(masterKey).whenComplete((result, throwable) -> {
-                String content = new String(result);
-                if (Constants.NOT_FOUND.equals(content) || masterHasDown(content)) {
-                    willMsgPersistManager.compareAndPut(masterKey, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
-                        if (!rs || tb != null) {
-                            logger.error("{} fail to update master", ip, tb);
-                            return;
-                        }
-                        logger.info("{} update master successfully", ip);
-                    });
-                }
-            });
-        } catch (Exception e) {
-            logger.error("", e);
-        }
-    }
-
-    private boolean masterHasDown(String masterValue) {
-        String[] ipTime = masterValue.split(Constants.COLON);
-        if (ipTime.length < 2) {
-            logger.error("master ip:updateTime split error, len < 2");
-            return true;
-        }
-
-        return System.currentTimeMillis() - Long.parseLong(ipTime[1]) > 10 * checkAliveIntervalMillis;
-    }
-
-    private void masterLoop() {
-        try {
-            String ip = IpUtil.getLocalAddressCompatible();
-            if (ip == null) {
-                logger.error("can not get local ip");
-                return;
-            }
-
-            willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
-                if (result == null || throwable != null) {
-                    logger.error("fail to get CS_MASTER", throwable);
-                    return;
-                }
-
-                String content = new String(result);
-                if (Constants.NOT_FOUND.equals(content)) {
-                    // no master
-                    return;
-                }
-
-                if (!content.startsWith(ip)) {
-                    // is not master
-                    return;
-                }
-                // master keep alive
-                long currentTime = System.currentTimeMillis();
-                willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
-                    if (!rs || tb != null) {
-                        logger.error("{} fail to update master", ip, tb);
-                    }
-                });
-
-                // master to check all cs state
-                String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
-                String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
-                willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
-                    if (rs == null || tb != null) {
-                        logger.error("{} master fail to scan cs", ip, tb);
-                        return;
-                    }
-
-                    if (rs.size() == 0) {
-                        logger.info("master scanned 0 cs");
-                        return;
-                    }
-
-                    for (Map.Entry<String, String> entry : rs.entrySet()) {
-                        String key = entry.getKey();
-                        String value = entry.getValue();
-                        logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
-                        if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
-                            // the cs has down
-                            String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
-                            handleShutDownCS(csIp);
-
-                            willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
-                                if (!resultDel || tbDel != null) {
-                                    logger.error("fail to delete shutDown cs:{} in db", key);
-                                    return;
-                                }
-                                logger.debug("delete shutDown cs:{} in db successfully", key);
-                            });
-                        }
-                    }
-                });
-            });
-        } catch (Exception e) {
-            logger.error("", e);
-        }
-    }
-
-    private void handleShutDownCS(String ip) {
-        String startClientKey = ip + Constants.CTRL_0;
-        String endClientKey = ip + Constants.CTRL_2;
-        willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
-            if (willMap == null || throwable != null) {
-                logger.error("{} master fail to scan cs", ip, throwable);
-                return;
-            }
-
-            if (willMap.size() == 0) {
-                logger.info("the cs:{} has no will", ip);
-                return;
-            }
-
-            for (Map.Entry<String, String> entry : willMap.entrySet()) {
-                logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
-                String willKey = entry.getKey();
-                String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
-
-                WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
-                int mqttId = mqttMsgId.nextId(clientId);
-                MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
-                        willMessage.getQos(), mqttId, willMessage.isRetain());
-                Runnable runnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
-                        upstreamHookResult.whenComplete((hookResult, tb) -> {
-                            try {
-                                if (!hookResult.isSuccess()) {
-                                    executor.submit(this);
-                                } else {
-                                    willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
-                                        if (!resultDel || tbDel != null) {
-                                            logger.error("fail to delete will message key:{}", willKey);
-                                            return;
-                                        }
-                                        logger.info("delete will message key {} successfully", willKey);
-                                    });
-                                }
-                            } catch (Throwable t) {
-                                logger.error("", t);
-                            }
-                        });
-                    }
-                };
-                executor.submit(runnable);
-            }
-        });
-    }
-
     @Override
     public void setChannelManager(ChannelManager channelManager) {
         this.channelManager = channelManager;
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
new file mode 100644
index 0000000..e27bb47
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.session.loop;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.meta.IpUtil;
+import org.apache.rocketmq.mqtt.common.model.Constants;
+import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.common.model.WillMessage;
+import org.apache.rocketmq.mqtt.common.util.MessageUtil;
+import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class WillLoop {
+    private static Logger logger = LoggerFactory.getLogger(WillLoop.class);
+    private ScheduledThreadPoolExecutor aliveService = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("check_alive_thread_"));
+    private long checkAliveIntervalMillis = 5 * 1000;
+    private ThreadPoolExecutor executor;
+
+    @Resource
+    private WillMsgPersistManager willMsgPersistManager;
+
+    @Resource
+    private MqttMsgId mqttMsgId;
+
+    @Resource
+    private PublishProcessor publishProcessor;
+
+    @PostConstruct
+    public void init() {
+        aliveService.scheduleWithFixedDelay(() -> csLoop(), 15 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
+        aliveService.scheduleWithFixedDelay(() -> masterLoop(), 10 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
+
+        executor = new ThreadPoolExecutor(
+                1,
+                1,
+                1,
+                TimeUnit.MINUTES,
+                new LinkedBlockingQueue<>(5000),
+                new ThreadFactoryImpl("DispatchWillToMQ_ "));
+    }
+
+    private void csLoop() {
+        try {
+            String ip = IpUtil.getLocalAddressCompatible();
+            String csKey = Constants.CS_ALIVE + Constants.CTRL_1 + ip;
+            String masterKey = Constants.CS_MASTER;
+            long currentTime = System.currentTimeMillis();
+
+            willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
+                if (result == null || throwable != null) {
+                    logger.error("{} fail to put csKey", csKey, throwable);
+                }
+            });
+
+            willMsgPersistManager.get(masterKey).whenComplete((result, throwable) -> {
+                String content = new String(result);
+                if (Constants.NOT_FOUND.equals(content) || masterHasDown(content)) {
+                    willMsgPersistManager.compareAndPut(masterKey, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
+                        if (!rs || tb != null) {
+                            logger.error("{} fail to update master", ip, tb);
+                            return;
+                        }
+                        logger.info("{} update master successfully", ip);
+                    });
+                }
+            });
+        } catch (Exception e) {
+            logger.error("", e);
+        }
+    }
+
+    private boolean masterHasDown(String masterValue) {
+        String[] ipTime = masterValue.split(Constants.COLON);
+        if (ipTime.length < 2) {
+            logger.error("master ip:updateTime split error, len < 2");
+            return true;
+        }
+
+        return System.currentTimeMillis() - Long.parseLong(ipTime[1]) > 10 * checkAliveIntervalMillis;
+    }
+
+    private void masterLoop() {
+        try {
+            String ip = IpUtil.getLocalAddressCompatible();
+            if (ip == null) {
+                logger.error("can not get local ip");
+                return;
+            }
+
+            willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
+                if (result == null || throwable != null) {
+                    logger.error("fail to get CS_MASTER", throwable);
+                    return;
+                }
+
+                String content = new String(result);
+                if (Constants.NOT_FOUND.equals(content)) {
+                    // no master
+                    return;
+                }
+
+                if (!content.startsWith(ip)) {
+                    // is not master
+                    return;
+                }
+                // master keep alive
+                long currentTime = System.currentTimeMillis();
+                willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
+                    if (!rs || tb != null) {
+                        logger.error("{} fail to update master", ip, tb);
+                    }
+                });
+
+                // master to check all cs state
+                String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
+                String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
+                willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
+                    if (rs == null || tb != null) {
+                        logger.error("{} master fail to scan cs", ip, tb);
+                        return;
+                    }
+
+                    if (rs.size() == 0) {
+                        logger.info("master scanned 0 cs");
+                        return;
+                    }
+
+                    for (Map.Entry<String, String> entry : rs.entrySet()) {
+                        String key = entry.getKey();
+                        String value = entry.getValue();
+                        logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
+                        if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
+                            // the cs has down
+                            String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
+                            handleShutDownCS(csIp);
+
+                            willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
+                                if (!resultDel || tbDel != null) {
+                                    logger.error("fail to delete shutDown cs:{} in db", key);
+                                    return;
+                                }
+                                logger.debug("delete shutDown cs:{} in db successfully", key);
+                            });
+                        }
+                    }
+                });
+            });
+        } catch (Exception e) {
+            logger.error("", e);
+        }
+    }
+
+    private void handleShutDownCS(String ip) {
+        String startClientKey = ip + Constants.CTRL_0;
+        String endClientKey = ip + Constants.CTRL_2;
+        willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
+            if (willMap == null || throwable != null) {
+                logger.error("{} master fail to scan cs", ip, throwable);
+                return;
+            }
+
+            if (willMap.size() == 0) {
+                logger.info("the cs:{} has no will", ip);
+                return;
+            }
+
+            for (Map.Entry<String, String> entry : willMap.entrySet()) {
+                logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
+                String willKey = entry.getKey();
+                String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
+
+                WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
+                int mqttId = mqttMsgId.nextId(clientId);
+                MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
+                        willMessage.getQos(), mqttId, willMessage.isRetain());
+                Runnable runnable = new Runnable() {
+                    @Override
+                    public void run() {
+                        CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
+                        upstreamHookResult.whenComplete((hookResult, tb) -> {
+                            try {
+                                if (!hookResult.isSuccess()) {
+                                    executor.submit(this);
+                                } else {
+                                    willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
+                                        if (!resultDel || tbDel != null) {
+                                            logger.error("fail to delete will message key:{}", willKey);
+                                            return;
+                                        }
+                                        logger.info("delete will message key {} successfully", willKey);
+                                    });
+                                }
+                            } catch (Throwable t) {
+                                logger.error("", t);
+                            }
+                        });
+                    }
+                };
+                executor.submit(runnable);
+            }
+        });
+    }
+
+}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
index 5e8cd38..20ac4e5 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
@@ -118,16 +118,15 @@
     public Message parseMessage(byte[] bytes) throws Exception {
         Message result;
         try {
-            result = ReadRequest.parseFrom(bytes);
-            return result;
-        } catch (Throwable ignore) {
-        }
-        try {
             result = WriteRequest.parseFrom(bytes);
             return result;
         } catch (Throwable ignore) {
         }
-
+        try {
+            result = ReadRequest.parseFrom(bytes);
+            return result;
+        } catch (Throwable ignore) {
+        }
         throw new Exception("parse message from bytes error");
     }
 
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
index 5573cdb..81690c6 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.mqtt.meta.raft.processor;
 
+import com.alipay.sofa.jraft.util.BytesUtil;
 import com.google.protobuf.ByteString;
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
@@ -24,9 +25,13 @@
 import org.apache.rocketmq.mqtt.common.meta.Constants;
 import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -83,7 +88,6 @@
         writeLock.lock();
         try {
             rocksDBEngine.getRdb().put(rocksDBEngine.getWriteOptions(), key, value);
-
             return Response.newBuilder()
                     .setSuccess(true)
                     .build();
@@ -100,7 +104,6 @@
         writeLock.lock();
         try {
             rocksDBEngine.getRdb().delete(rocksDBEngine.getWriteOptions(), key);
-
             return Response.newBuilder()
                     .setSuccess(true)
                     .build();
@@ -112,4 +115,58 @@
         }
     }
 
+    public Response compareAndPut(RocksDBEngine rocksDBEngine, byte[] key, byte[] expectValue, byte[] updateValue) throws Exception {
+        final Lock writeLock = rocksDBEngine.getReadWriteLock().writeLock();
+        writeLock.lock();
+        try {
+            final byte[] actual = rocksDBEngine.getRdb().get(key);
+            if (Arrays.equals(expectValue, actual)) {
+                rocksDBEngine.getRdb().put(rocksDBEngine.getWriteOptions(), key, updateValue);
+                return Response.newBuilder()
+                        .setSuccess(true)
+                        .build();
+            } else {
+                return Response.newBuilder()
+                        .setSuccess(false)
+                        .build();
+            }
+        } catch (final Exception e) {
+            logger.error("Fail to delete, k {}", key, e);
+            throw e;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey) throws Exception {
+        Map<String, String> result = new HashMap<>();
+        final Lock readLock = rocksDBEngine.getReadWriteLock().readLock();
+        readLock.lock();
+        try {
+            final RocksIterator it = rocksDBEngine.getRdb().newIterator();
+            if (startKey == null) {
+                it.seekToFirst();
+            } else {
+                it.seek(startKey);
+            }
+            while (it.isValid()) {
+                final byte[] key = it.key();
+                if (endKey != null && BytesUtil.compare(key, endKey) >= 0) {
+                    break;
+                }
+                result.put(new String(key), new String(it.value()));
+                it.next();
+            }
+            return Response.newBuilder()
+                    .setSuccess(true)
+                    .putAllDataMap(result)
+                    .build();
+        } catch (final Exception e) {
+            logger.error("Fail to delete, startKey {}, endKey {}", startKey, endKey, e);
+            throw e;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
 }
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
index 17f432d..5376a99 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
@@ -17,23 +17,15 @@
 
 package org.apache.rocketmq.mqtt.meta.raft.processor;
 
-import com.alipay.sofa.jraft.util.BytesUtil;
+import org.apache.rocketmq.mqtt.common.meta.Constants;
 import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
 import org.apache.rocketmq.mqtt.common.model.consistency.Response;
 import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
 import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
 import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
-import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
-import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-
 import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_WILL_MSG;
 
 public class WillMsgStateProcessor extends StateProcessor {
@@ -108,62 +100,4 @@
         return CATEGORY_WILL_MSG;
     }
 
-    public Response compareAndPut(RocksDBEngine rocksDBEngine, byte[] key, byte[] expectValue, byte[] updateValue) throws Exception {
-        final Lock writeLock = rocksDBEngine.getReadWriteLock().writeLock();
-        writeLock.lock();
-        try {
-            final byte[] actual = rocksDBEngine.getRdb().get(key);
-            if (Arrays.equals(expectValue, actual)) {
-                rocksDBEngine.getRdb().put(rocksDBEngine.getWriteOptions(), key, updateValue);
-                return Response.newBuilder()
-                        .setSuccess(true)
-                        .build();
-            } else {
-                return Response.newBuilder()
-                        .setSuccess(false)
-                        .build();
-            }
-        } catch (final Exception e) {
-            logger.error("Fail to delete, k {}", key, e);
-            throw e;
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey) throws Exception {
-        Map<String, String> result = new HashMap<>();
-
-        final Lock readLock = rocksDBEngine.getReadWriteLock().readLock();
-        readLock.lock();
-        try {
-
-            final RocksIterator it = rocksDBEngine.getRdb().newIterator();
-            if (startKey == null) {
-                it.seekToFirst();
-            } else {
-                it.seek(startKey);
-            }
-
-            while (it.isValid()) {
-                final byte[] key = it.key();
-                if (endKey != null && BytesUtil.compare(key, endKey) >= 0) {
-                    break;
-                }
-                result.put(new String(key), new String(it.value()));
-                it.next();
-            }
-
-            return Response.newBuilder()
-                    .setSuccess(true)
-                    .putAllDataMap(result)
-                    .build();
-        } catch (final Exception e) {
-            logger.error("Fail to delete, startKey {}, endKey {}", startKey, endKey, e);
-            throw e;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
 }
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java
index 7d57590..be0ff7d 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java
@@ -78,7 +78,7 @@
         }
     }
 
-    public void writeSnapshot(final String snapshotPath) {
+    private void writeSnapshot(final String snapshotPath) {
         Lock lock = rocksDBEngine.getReadWriteLock().writeLock();
         lock.lock();
         try (final Checkpoint checkpoint = Checkpoint.create(rocksDBEngine.getRdb())) {
@@ -98,7 +98,7 @@
         }
     }
 
-    public void compressSnapshot(final SnapshotWriter writer, final LocalFileMetaOutter.LocalFileMeta.Builder metaBuilder, final Closure done) {
+    private void compressSnapshot(final SnapshotWriter writer, final LocalFileMetaOutter.LocalFileMeta.Builder metaBuilder, final Closure done) {
         final String writerPath = writer.getPath();
         final String outputFile = Paths.get(writerPath, SNAPSHOT_ARCHIVE).toString();
         try {
@@ -116,7 +116,7 @@
         }
     }
 
-    public void decompressSnapshot(final String readerPath, final LocalFileMetaOutter.LocalFileMeta meta) throws Throwable {
+    private void decompressSnapshot(final String readerPath, final LocalFileMetaOutter.LocalFileMeta meta) throws Throwable {
         final String sourceFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString();
         final Checksum checksum = new CRC64();
         ZipStrategyManager.getDefault().deCompress(sourceFile, readerPath, checksum);
@@ -125,7 +125,7 @@
         }
     }
 
-    public void readSnapshot(final String snapshotPath) {
+    private void readSnapshot(final String snapshotPath) {
         Lock lock = rocksDBEngine.getReadWriteLock().readLock();
         lock.lock();
         try {
@@ -149,7 +149,7 @@
         }
     }
 
-    public LocalFileMetaOutter.LocalFileMeta.Builder writeMetadata(final LocalFileMetaOutter.LocalFileMeta metadata) {
+    private LocalFileMetaOutter.LocalFileMeta.Builder writeMetadata(final LocalFileMetaOutter.LocalFileMeta metadata) {
         if (metadata == null) {
             return LocalFileMetaOutter.LocalFileMeta.newBuilder();
         }
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/util/RaftExecutor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/util/RaftExecutor.java
deleted file mode 100644
index a6cfca2..0000000
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/util/RaftExecutor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.mqtt.meta.util;
-
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-
-/**
- * raft executor.
- */
-public final class RaftExecutor {
-    
-    private static ExecutorService raftSnapshotExecutor;
-    
-    private RaftExecutor() {
-        init();
-    }
-    
-    public static void init() {
-        raftSnapshotExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("loop_snapshot"));
-    }
-    
-    public static void doSnapshot(Runnable runnable) {
-        raftSnapshotExecutor.execute(runnable);
-    }
-    
-}