[ISSUE #44] Support shared subscription
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
index b393abe..66c9bac 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
@@ -20,6 +20,8 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.PullResult;
 import org.apache.rocketmq.mqtt.common.model.Queue;
@@ -49,6 +51,19 @@
     CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count);
 
     /**
+     * pop messages
+     *
+     * @param consumerGroup
+     * @param firstTopic
+     * @param queue
+     * @param count
+     * @return
+     */
+    CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count);
+
+    void popAck(String lmqTopic, String consumerGroup, Message message);
+
+    /**
      * pull last messages
      *
      * @param firstTopic
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index f6e404c..cd764d7 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -23,6 +23,7 @@
 
     public static final String PLUS_SIGN = "+";
     public static final String NUMBER_SIGN = "#";
+    public static final String DOLLAR_SIGN = "$";
     public static final String COLON = ":";
 
     public static final String P2P = "/p2p/";
@@ -55,5 +56,7 @@
     public static final byte CTRL_2 = '\u0002';
 
     public static final String NOT_FOUND = "NOT_FOUND";
+    public static final String SHARED_PREFIX = DOLLAR_SIGN + "share";
+    public static final String EMPTY_SHARE_NAME = "";
 
 }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
index e26f780..60d4ba8 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
@@ -24,6 +24,7 @@
 public class PullResult {
     public static final int PULL_SUCCESS = 301;
     public static final int PULL_OFFSET_MOVED = 302;
+    public static final int NO_NEW_MSG = 303;
     private int code = PULL_SUCCESS;
     private String remark;
     private List<Message> messageList;
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
index c10921f..47b7e69 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
@@ -51,6 +51,10 @@
         return TopicUtils.isP2pTopic(queueName);
     }
 
+    public boolean isShare() {
+        return TopicUtils.isSharedSubscription(queueName);
+    }
+
     public long getQueueId() {
         return queueId;
     }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
index cd2d380..8aa62fb 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
@@ -85,6 +85,17 @@
         return topicFilter != null ? topicFilter.hashCode() : 0;
     }
 
+    public boolean isShare() {
+        return TopicUtils.isSharedSubscription(topicFilter);
+    }
+
+    public String getSharedName() {
+        if (!isShare()) {
+            return null;
+        }
+        return TopicUtils.getSharedName(topicFilter);
+    }
+
     public String getTopicFilter() {
         return topicFilter;
     }
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
index 166475f..4e5f07b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.mqtt.common.util;
 
+import java.util.Arrays;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.mqtt.common.model.Constants;
 import org.apache.rocketmq.mqtt.common.model.MqttTopic;
@@ -116,6 +118,9 @@
         if (topics.startsWith(Constants.MQTT_TOPIC_DELIMITER)) {
             topics = topics.substring(1);
         }
+        if (topics.startsWith(Constants.SHARED_PREFIX)) {
+            topics = TopicUtils.getSharedTopicFilter(topics);
+        }
         String topic;
         String secondTopic = null;
         int index = topics.indexOf(Constants.MQTT_TOPIC_DELIMITER, 1);
@@ -185,4 +190,24 @@
     public static String wrapP2pLmq(String clientId) {
         return normalizeTopic(Constants.P2P + clientId);
     }
+
+    // shared subscription topic filter format: $share/{ShareName}/{filter}
+    public static boolean isSharedSubscription(String topicFilter) {
+        if (StringUtils.isEmpty(topicFilter)) {
+            return false;
+        }
+        if (!topicFilter.startsWith(Constants.SHARED_PREFIX)) {
+            return false;
+        }
+        String[] arr = topicFilter.split(Constants.MQTT_TOPIC_DELIMITER);
+        return arr.length > 2;
+    }
+
+    public static String getSharedName(String topicFilter) {
+        return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER)[1];
+    }
+
+    public static String getSharedTopicFilter(String topicFilter) {
+        return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER, 3)[2];
+    }
 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
index 218b460..9d656f1 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
@@ -290,6 +290,9 @@
         if (!subscriptions.containsKey(subscription.getTopicFilter())) {
             return false;
         }
+        if (subscription.isShare()) {
+            return true;
+        }
         if (!sendingMessages.containsKey(subscription)) {
             sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
         }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 15d4ed2..70592a3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -21,6 +21,8 @@
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.model.Queue;
 import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -56,6 +58,8 @@
     @Resource
     private ConnectConf connectConf;
 
+    @Resource
+    private LmqQueueStore lmqQueueStore;
 
     public void messageArrive(Session session, Subscription subscription, Queue queue) {
         if (session == null) {
@@ -163,6 +167,9 @@
             if (subscription.isRetry()) {
                 message.setRetry(message.getRetry() + 1);
                 logger.warn("retryPush:{},{},{}", session.getClientId(), message.getMsgId(), message.getRetry());
+            } else if (subscription.isShare()) {
+                String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/","%");
+                lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), message);
             }
         });
     }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index 89c6a71..771d09f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -111,7 +111,7 @@
         if (queue == null) {
             return;
         }
-        if (queue.isP2p() || queue.isRetry()) {
+        if (queue.isP2p() || queue.isRetry() || queue.isShare()) {
             return;
         }
         addLoadEvent(queue, pair.getRight());
@@ -188,6 +188,13 @@
             callbackResult(pullResult, callBackResult);
             return DONE;
         }
+
+        if (subscription.isShare()) {
+            CompletableFuture<PullResult> pullResult = lmqQueueStore.popMessage(subscription.getSharedName(), toFirstTopic(subscription), queue, count);
+            callbackResult(pullResult, callBackResult);
+            return DONE;
+        }
+
         CacheEntry cacheEntry = cache.getIfPresent(queue);
         if (cacheEntry == null) {
             StatUtil.addPv("NoPullCache", 1);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index 0c8bc17..7371189 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -34,6 +34,7 @@
 import org.apache.rocketmq.mqtt.common.model.Subscription;
 import org.apache.rocketmq.mqtt.common.model.WillMessage;
 import org.apache.rocketmq.mqtt.common.util.SpringUtils;
+import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
 import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
 import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
@@ -393,6 +394,7 @@
                 throw new RuntimeException(
                         "invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
             }
+            logger.info("session loop impl doing notifyPullMessage queueFresh.freshQueue({}, {}})", session, subscription);
             queueFresh.freshQueue(session, subscription);
             pullMessage(session, subscription, queue);
             return;
@@ -540,7 +542,6 @@
                     }
                 } else if (PullResult.PULL_OFFSET_MOVED == pullResult.getCode()) {
                     queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
-                    queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
                     session.markPersistOffsetFlag(true);
                     pullMessage(session, subscription, queue);
                 } else {
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index a6327b8..e027fa5 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -117,6 +117,8 @@
 
     private void refresh() throws MQClientException {
         Set<String> tmp = metaPersistManager.getAllFirstTopics();
+        logger.info("Notify Manager is refreshing, all first topic is " + tmp);
+
         if (tmp == null || tmp.isEmpty()) {
             return;
         }
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 7511fdd..3a04b2b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -20,7 +20,12 @@
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -34,11 +39,15 @@
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
@@ -70,12 +79,14 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @Component
 public class LmqQueueStoreManager implements LmqQueueStore {
     private static Logger logger = LoggerFactory.getLogger(LmqQueueStoreManager.class);
     private PullAPIWrapper pullAPIWrapper;
+    private MQClientInstance mQClientFactory;
     private DefaultMQPullConsumer defaultMQPullConsumer;
     private DefaultMQProducer defaultMQProducer;
     private String consumerGroup = MixAll.CID_RMQ_SYS_PREFIX + "LMQ_PULL";
@@ -93,6 +104,7 @@
         defaultMQPullConsumer.setConsumerPullTimeoutMillis(2000);
         defaultMQPullConsumer.start();
         pullAPIWrapper = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
+        mQClientFactory = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
 
         defaultMQProducer = MqFactory.buildDefaultMQProducer("GID_LMQ_SEND", serviceConf.getProperties());
         defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
@@ -155,6 +167,11 @@
                             new TypeReference<Map<String, String>>() {
                             }));
         }
+
+        if (StringUtils.isNotBlank(mqMessage.getProperty(MessageConst.PROPERTY_POP_CK))) {
+            message.getUserProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, mqMessage.getProperty(MessageConst.PROPERTY_POP_CK));
+        }
+
         return message;
     }
 
@@ -453,4 +470,169 @@
 
         throw new MQClientException("The broker[" + queue.getBrokerName() + "] not exist", null);
     }
+
+    @Override
+    public CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count) {
+        CompletableFuture<PullResult> result = new CompletableFuture<>();
+        long start = System.currentTimeMillis();
+        PopCallback popCallback = new PopCallback() {
+            @Override
+            public void onSuccess(PopResult popResult) {
+                if (popResult == null) {
+                    logger.warn("PopResult is null, just wait retry");
+                    return;
+                }
+
+                PullResult lmqPullResult = new PullResult();
+                if (PopStatus.FOUND == popResult.getPopStatus()) {
+                    lmqPullResult.setCode(PullResult.PULL_SUCCESS);
+                    List<MessageExt> messageExtList = popResult.getMsgFoundList();
+                    if (messageExtList != null && !messageExtList.isEmpty()) {
+                        List<Message> messageList = messageExtList.stream()
+                            .map(messageExt -> toLmqMessage(queue, messageExt))
+                            .collect(Collectors.toList());
+                        lmqPullResult.setMessageList(messageList);
+                    }
+                } else {
+                    lmqPullResult.setCode(PullResult.NO_NEW_MSG);
+                }
+
+                result.complete(lmqPullResult);
+
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPop", rt);
+                collectLmqReadWriteMatchActionRt("lmqPop", rt, true);
+                StatUtil.addPv(popResult.getPopStatus().name(), 1);
+                try {
+                    MqttMetricsCollector.collectPullStatusTps(1, popResult.getPopStatus().name());
+                } catch (Throwable e) {
+                    logger.error("collect prometheus error", e);
+                }
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                logger.error("pop message from {} error", queue, e);
+                result.completeExceptionally(e);
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPop", rt, false);
+                collectLmqReadWriteMatchActionRt("lmqPop", rt, false);
+            }
+        };
+
+        try {
+            String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
+            MessageQueue firstTopicQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
+            popKernelImpl(lmqTopic, firstTopicQueue, consumerGroup, ExpressionType.TAG, "*",
+                60000, count, ConsumeInitMode.MAX, false, 15000, 15000, popCallback);
+        } catch (Throwable e) {
+            result.completeExceptionally(e);
+        }
+
+        return result;
+    }
+
+    public void popKernelImpl(
+        final String lmqTopic,
+        final MessageQueue mq,
+        final String consumerGroup,
+        final String expressionType,
+        final String subExpression,
+        final long invisibleTime,
+        final long maxNums,
+        final int initMode,
+        final boolean order,
+        final long pollTime,
+        final long timeoutMillis,
+        final PopCallback popCallback
+    ) throws RemotingException, InterruptedException, MQClientException {
+        FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+        if (null == findBrokerResult) {
+            mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+            if (null == findBrokerResult) {
+                throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+            }
+        }
+
+        PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setTopic(lmqTopic);
+        requestHeader.setQueueId(mq.getQueueId());
+        requestHeader.setMaxMsgNums((int) maxNums);
+        requestHeader.setInvisibleTime(invisibleTime);
+        requestHeader.setInitMode(initMode);
+        requestHeader.setExpType(expressionType);
+        requestHeader.setExp(subExpression);
+        requestHeader.setOrder(order);
+        requestHeader.setPollTime(pollTime);
+        requestHeader.setBornTime(System.currentTimeMillis());
+
+        mQClientFactory.getMQClientAPIImpl().popMessageAsync(mq.getBrokerName(), findBrokerResult.getBrokerAddr(),
+            requestHeader, timeoutMillis, popCallback);
+    }
+
+    public void popAck(String lmqTopic, String consumerGroup, Message message) {
+        long start = System.currentTimeMillis();
+        AckCallback ackCallback = new AckCallback() {
+            @Override
+            public void onSuccess(AckResult ackResult) {
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPopAck", rt);
+                collectLmqReadWriteMatchActionRt("lmqPopAck", rt, true);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(10);
+                    popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, this);
+                } catch (Exception ignore) {
+                    logger.warn("popAck {} message error", lmqTopic, e);
+                }
+                long rt = System.currentTimeMillis() - start;
+                StatUtil.addInvoke("lmqPopAck", rt, false);
+                collectLmqReadWriteMatchActionRt("lmqPopAck", rt, false);
+            }
+        };
+
+        try {
+            popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, ackCallback);
+        } catch (Exception e) {
+            logger.error("pop ack error", e);
+        }
+    }
+
+    public void popAckKernelImpl(
+        final String lmqTopic,
+        final String consumerGroup,
+        final Message message,
+        final long timeoutMillis,
+        final AckCallback ackCallback
+    ) throws RemotingException, MQBrokerException, MQClientException, InterruptedException {
+        String extraInfo = message.getUserProperty(MessageConst.PROPERTY_POP_CK);
+        String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+
+        String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+        FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+        if (null == findBrokerResult) {
+            mQClientFactory.updateTopicRouteInfoFromNameServer(message.getFirstTopic());
+            findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+            if (null == findBrokerResult) {
+                throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+            }
+        }
+
+        int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+        long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
+
+        AckMessageRequestHeader ackMessageRequestHeader = new AckMessageRequestHeader();
+        ackMessageRequestHeader.setTopic(lmqTopic);
+        ackMessageRequestHeader.setQueueId(queueId);
+        ackMessageRequestHeader.setOffset(queueOffset);
+        ackMessageRequestHeader.setConsumerGroup(consumerGroup);
+        ackMessageRequestHeader.setExtraInfo(extraInfo);
+
+        mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), timeoutMillis, ackCallback, ackMessageRequestHeader);
+    }
 }
diff --git a/pom.xml b/pom.xml
index 15839cb..7679651 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
         <java.target.version>1.8</java.target.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <spring.version>4.3.16.RELEASE</spring.version>
-        <rocket.version>4.9.3</rocket.version>
+        <rocket.version>4.9.4</rocket.version>
         <prometheus.version>0.12.0</prometheus.version>
         <grpc-java.version>1.24.0</grpc-java.version>
         <proto-google-common-protos.version>1.17.0</proto-google-common-protos.version>