[ISSUE #44] Support shared subscription
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
index b393abe..66c9bac 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
@@ -20,6 +20,8 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
@@ -49,6 +51,19 @@
CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count);
/**
+ * pop messages
+ *
+ * @param consumerGroup
+ * @param firstTopic
+ * @param queue
+ * @param count
+ * @return
+ */
+ CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count);
+
+ void popAck(String lmqTopic, String consumerGroup, Message message);
+
+ /**
* pull last messages
*
* @param firstTopic
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index f6e404c..cd764d7 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -23,6 +23,7 @@
public static final String PLUS_SIGN = "+";
public static final String NUMBER_SIGN = "#";
+ public static final String DOLLAR_SIGN = "$";
public static final String COLON = ":";
public static final String P2P = "/p2p/";
@@ -55,5 +56,7 @@
public static final byte CTRL_2 = '\u0002';
public static final String NOT_FOUND = "NOT_FOUND";
+ public static final String SHARED_PREFIX = DOLLAR_SIGN + "share";
+ public static final String EMPTY_SHARE_NAME = "";
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
index e26f780..60d4ba8 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
@@ -24,6 +24,7 @@
public class PullResult {
public static final int PULL_SUCCESS = 301;
public static final int PULL_OFFSET_MOVED = 302;
+ public static final int NO_NEW_MSG = 303;
private int code = PULL_SUCCESS;
private String remark;
private List<Message> messageList;
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
index c10921f..47b7e69 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
@@ -51,6 +51,10 @@
return TopicUtils.isP2pTopic(queueName);
}
+ public boolean isShare() {
+ return TopicUtils.isSharedSubscription(queueName);
+ }
+
public long getQueueId() {
return queueId;
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
index cd2d380..8aa62fb 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
@@ -85,6 +85,17 @@
return topicFilter != null ? topicFilter.hashCode() : 0;
}
+ public boolean isShare() {
+ return TopicUtils.isSharedSubscription(topicFilter);
+ }
+
+ public String getSharedName() {
+ if (!isShare()) {
+ return null;
+ }
+ return TopicUtils.getSharedName(topicFilter);
+ }
+
public String getTopicFilter() {
return topicFilter;
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
index 166475f..4e5f07b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.mqtt.common.util;
+import java.util.Arrays;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.mqtt.common.model.Constants;
import org.apache.rocketmq.mqtt.common.model.MqttTopic;
@@ -116,6 +118,9 @@
if (topics.startsWith(Constants.MQTT_TOPIC_DELIMITER)) {
topics = topics.substring(1);
}
+ if (topics.startsWith(Constants.SHARED_PREFIX)) {
+ topics = TopicUtils.getSharedTopicFilter(topics);
+ }
String topic;
String secondTopic = null;
int index = topics.indexOf(Constants.MQTT_TOPIC_DELIMITER, 1);
@@ -185,4 +190,24 @@
public static String wrapP2pLmq(String clientId) {
return normalizeTopic(Constants.P2P + clientId);
}
+
+ // shared subscription topic filter format: $share/{ShareName}/{filter}
+ public static boolean isSharedSubscription(String topicFilter) {
+ if (StringUtils.isEmpty(topicFilter)) {
+ return false;
+ }
+ if (!topicFilter.startsWith(Constants.SHARED_PREFIX)) {
+ return false;
+ }
+ String[] arr = topicFilter.split(Constants.MQTT_TOPIC_DELIMITER);
+ return arr.length > 2;
+ }
+
+ public static String getSharedName(String topicFilter) {
+ return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER)[1];
+ }
+
+ public static String getSharedTopicFilter(String topicFilter) {
+ return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER, 3)[2];
+ }
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
index 218b460..9d656f1 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
@@ -290,6 +290,9 @@
if (!subscriptions.containsKey(subscription.getTopicFilter())) {
return false;
}
+ if (subscription.isShare()) {
+ return true;
+ }
if (!sendingMessages.containsKey(subscription)) {
sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 15d4ed2..70592a3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -21,6 +21,8 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -56,6 +58,8 @@
@Resource
private ConnectConf connectConf;
+ @Resource
+ private LmqQueueStore lmqQueueStore;
public void messageArrive(Session session, Subscription subscription, Queue queue) {
if (session == null) {
@@ -163,6 +167,9 @@
if (subscription.isRetry()) {
message.setRetry(message.getRetry() + 1);
logger.warn("retryPush:{},{},{}", session.getClientId(), message.getMsgId(), message.getRetry());
+ } else if (subscription.isShare()) {
+ String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/","%");
+ lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), message);
}
});
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index 89c6a71..771d09f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -111,7 +111,7 @@
if (queue == null) {
return;
}
- if (queue.isP2p() || queue.isRetry()) {
+ if (queue.isP2p() || queue.isRetry() || queue.isShare()) {
return;
}
addLoadEvent(queue, pair.getRight());
@@ -188,6 +188,13 @@
callbackResult(pullResult, callBackResult);
return DONE;
}
+
+ if (subscription.isShare()) {
+ CompletableFuture<PullResult> pullResult = lmqQueueStore.popMessage(subscription.getSharedName(), toFirstTopic(subscription), queue, count);
+ callbackResult(pullResult, callBackResult);
+ return DONE;
+ }
+
CacheEntry cacheEntry = cache.getIfPresent(queue);
if (cacheEntry == null) {
StatUtil.addPv("NoPullCache", 1);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index 0c8bc17..7371189 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -34,6 +34,7 @@
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.model.WillMessage;
import org.apache.rocketmq.mqtt.common.util.SpringUtils;
+import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
@@ -393,6 +394,7 @@
throw new RuntimeException(
"invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
}
+ logger.info("session loop impl doing notifyPullMessage queueFresh.freshQueue({}, {}})", session, subscription);
queueFresh.freshQueue(session, subscription);
pullMessage(session, subscription, queue);
return;
@@ -540,7 +542,6 @@
}
} else if (PullResult.PULL_OFFSET_MOVED == pullResult.getCode()) {
queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
- queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
session.markPersistOffsetFlag(true);
pullMessage(session, subscription, queue);
} else {
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index a6327b8..e027fa5 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -117,6 +117,8 @@
private void refresh() throws MQClientException {
Set<String> tmp = metaPersistManager.getAllFirstTopics();
+ logger.info("Notify Manager is refreshing, all first topic is " + tmp);
+
if (tmp == null || tmp.isEmpty()) {
return;
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 7511fdd..3a04b2b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -20,7 +20,12 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -34,11 +39,15 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
@@ -70,12 +79,14 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component
public class LmqQueueStoreManager implements LmqQueueStore {
private static Logger logger = LoggerFactory.getLogger(LmqQueueStoreManager.class);
private PullAPIWrapper pullAPIWrapper;
+ private MQClientInstance mQClientFactory;
private DefaultMQPullConsumer defaultMQPullConsumer;
private DefaultMQProducer defaultMQProducer;
private String consumerGroup = MixAll.CID_RMQ_SYS_PREFIX + "LMQ_PULL";
@@ -93,6 +104,7 @@
defaultMQPullConsumer.setConsumerPullTimeoutMillis(2000);
defaultMQPullConsumer.start();
pullAPIWrapper = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
+ mQClientFactory = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
defaultMQProducer = MqFactory.buildDefaultMQProducer("GID_LMQ_SEND", serviceConf.getProperties());
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
@@ -155,6 +167,11 @@
new TypeReference<Map<String, String>>() {
}));
}
+
+ if (StringUtils.isNotBlank(mqMessage.getProperty(MessageConst.PROPERTY_POP_CK))) {
+ message.getUserProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, mqMessage.getProperty(MessageConst.PROPERTY_POP_CK));
+ }
+
return message;
}
@@ -453,4 +470,169 @@
throw new MQClientException("The broker[" + queue.getBrokerName() + "] not exist", null);
}
+
+ @Override
+ public CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count) {
+ CompletableFuture<PullResult> result = new CompletableFuture<>();
+ long start = System.currentTimeMillis();
+ PopCallback popCallback = new PopCallback() {
+ @Override
+ public void onSuccess(PopResult popResult) {
+ if (popResult == null) {
+ logger.warn("PopResult is null, just wait retry");
+ return;
+ }
+
+ PullResult lmqPullResult = new PullResult();
+ if (PopStatus.FOUND == popResult.getPopStatus()) {
+ lmqPullResult.setCode(PullResult.PULL_SUCCESS);
+ List<MessageExt> messageExtList = popResult.getMsgFoundList();
+ if (messageExtList != null && !messageExtList.isEmpty()) {
+ List<Message> messageList = messageExtList.stream()
+ .map(messageExt -> toLmqMessage(queue, messageExt))
+ .collect(Collectors.toList());
+ lmqPullResult.setMessageList(messageList);
+ }
+ } else {
+ lmqPullResult.setCode(PullResult.NO_NEW_MSG);
+ }
+
+ result.complete(lmqPullResult);
+
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPop", rt);
+ collectLmqReadWriteMatchActionRt("lmqPop", rt, true);
+ StatUtil.addPv(popResult.getPopStatus().name(), 1);
+ try {
+ MqttMetricsCollector.collectPullStatusTps(1, popResult.getPopStatus().name());
+ } catch (Throwable e) {
+ logger.error("collect prometheus error", e);
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ logger.error("pop message from {} error", queue, e);
+ result.completeExceptionally(e);
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPop", rt, false);
+ collectLmqReadWriteMatchActionRt("lmqPop", rt, false);
+ }
+ };
+
+ try {
+ String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
+ MessageQueue firstTopicQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
+ popKernelImpl(lmqTopic, firstTopicQueue, consumerGroup, ExpressionType.TAG, "*",
+ 60000, count, ConsumeInitMode.MAX, false, 15000, 15000, popCallback);
+ } catch (Throwable e) {
+ result.completeExceptionally(e);
+ }
+
+ return result;
+ }
+
+ public void popKernelImpl(
+ final String lmqTopic,
+ final MessageQueue mq,
+ final String consumerGroup,
+ final String expressionType,
+ final String subExpression,
+ final long invisibleTime,
+ final long maxNums,
+ final int initMode,
+ final boolean order,
+ final long pollTime,
+ final long timeoutMillis,
+ final PopCallback popCallback
+ ) throws RemotingException, InterruptedException, MQClientException {
+ FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+ }
+ }
+
+ PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setTopic(lmqTopic);
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setMaxMsgNums((int) maxNums);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setInitMode(initMode);
+ requestHeader.setExpType(expressionType);
+ requestHeader.setExp(subExpression);
+ requestHeader.setOrder(order);
+ requestHeader.setPollTime(pollTime);
+ requestHeader.setBornTime(System.currentTimeMillis());
+
+ mQClientFactory.getMQClientAPIImpl().popMessageAsync(mq.getBrokerName(), findBrokerResult.getBrokerAddr(),
+ requestHeader, timeoutMillis, popCallback);
+ }
+
+ public void popAck(String lmqTopic, String consumerGroup, Message message) {
+ long start = System.currentTimeMillis();
+ AckCallback ackCallback = new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPopAck", rt);
+ collectLmqReadWriteMatchActionRt("lmqPopAck", rt, true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, this);
+ } catch (Exception ignore) {
+ logger.warn("popAck {} message error", lmqTopic, e);
+ }
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPopAck", rt, false);
+ collectLmqReadWriteMatchActionRt("lmqPopAck", rt, false);
+ }
+ };
+
+ try {
+ popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, ackCallback);
+ } catch (Exception e) {
+ logger.error("pop ack error", e);
+ }
+ }
+
+ public void popAckKernelImpl(
+ final String lmqTopic,
+ final String consumerGroup,
+ final Message message,
+ final long timeoutMillis,
+ final AckCallback ackCallback
+ ) throws RemotingException, MQBrokerException, MQClientException, InterruptedException {
+ String extraInfo = message.getUserProperty(MessageConst.PROPERTY_POP_CK);
+ String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+
+ String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+ FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ mQClientFactory.updateTopicRouteInfoFromNameServer(message.getFirstTopic());
+ findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+ }
+ }
+
+ int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+ long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
+
+ AckMessageRequestHeader ackMessageRequestHeader = new AckMessageRequestHeader();
+ ackMessageRequestHeader.setTopic(lmqTopic);
+ ackMessageRequestHeader.setQueueId(queueId);
+ ackMessageRequestHeader.setOffset(queueOffset);
+ ackMessageRequestHeader.setConsumerGroup(consumerGroup);
+ ackMessageRequestHeader.setExtraInfo(extraInfo);
+
+ mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), timeoutMillis, ackCallback, ackMessageRequestHeader);
+ }
}
diff --git a/pom.xml b/pom.xml
index 15839cb..7679651 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<java.target.version>1.8</java.target.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>4.3.16.RELEASE</spring.version>
- <rocket.version>4.9.3</rocket.version>
+ <rocket.version>4.9.4</rocket.version>
<prometheus.version>0.12.0</prometheus.version>
<grpc-java.version>1.24.0</grpc-java.version>
<proto-google-common-protos.version>1.17.0</proto-google-common-protos.version>