Merge pull request #218 from apache/develop
Develop
diff --git a/README.md b/README.md
index 61009c4..a44b294 100644
--- a/README.md
+++ b/README.md
@@ -43,13 +43,15 @@
```
Some important configuration items in the **service.conf** configuration file
-| **Config Key** | **Instruction** |
-|-----------------------|--------------------------|
-| username | used for auth |
-| secretKey | used for auth |
-| NAMESRV_ADDR | specify namesrv address |
-| eventNotifyRetryTopic | notify event retry topic |
-| clientRetryTopic | client retry topic |
+| **Config Key** | **Instruction** |
+|-----------------------|---------------------------------------------------------------|
+| username | used for auth |
+| secretKey | used for auth |
+| NAMESRV_ADDR | specify namesrv address |
+| eventNotifyRetryTopic | notify event retry topic |
+| clientRetryTopic | client retry topic |
+| metaAddr | meta all nodes ip:port. Same as membersAddress in meta.config |
+
And some configuration items in the **meta.conf** configuration file
diff --git a/distribution/bin/meta.sh b/distribution/bin/meta.sh
index 091fb56..6ba17f1 100644
--- a/distribution/bin/meta.sh
+++ b/distribution/bin/meta.sh
@@ -49,7 +49,7 @@
BASEDIR=$HOME
mkdir -p $BASEDIR/logs
-mainClass="org.apache.rocketmq.mqtt.meta.starter.Startup"
+mainClass="org.apache.rocketmq.mqtt.meta.starter.MetaStartup"
function startup() {
diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml
index 13bd032..1edfa50 100644
--- a/mqtt-common/pom.xml
+++ b/mqtt-common/pom.xml
@@ -81,6 +81,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
index b393abe..656aee4 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/LmqQueueStore.java
@@ -49,6 +49,19 @@
CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count);
/**
+ * pop messages
+ *
+ * @param consumerGroup
+ * @param firstTopic
+ * @param queue
+ * @param count
+ * @return
+ */
+ CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count);
+
+ void popAck(String lmqTopic, String consumerGroup, Message message);
+
+ /**
* pull last messages
*
* @param firstTopic
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java
index 20777d0..bacfe29 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/RetainedPersistManager.java
@@ -18,7 +18,6 @@
package org.apache.rocketmq.mqtt.common.facade;
import org.apache.rocketmq.mqtt.common.model.Message;
-import org.apache.rocketmq.mqtt.common.model.Subscription;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
@@ -30,5 +29,5 @@
CompletableFuture<Message> getRetainedMessage(String preciseTopic);
- CompletableFuture<ArrayList<Message>> getMsgsFromTrie(Subscription topicFilter);
+ CompletableFuture<ArrayList<Message>> getMsgsFromTrie(String firstTopic,String originTopicFilter);
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java
index d1d9e06..49d306c 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgPersistManager.java
@@ -23,11 +23,12 @@
public interface WillMsgPersistManager {
CompletableFuture<Boolean> put(final String key, final String value);
+
CompletableFuture<Boolean> delete(final String key);
CompletableFuture<byte[]> get(final String key);
CompletableFuture<Boolean> compareAndPut(final String key, final String expectValue, final String updateValue);
- CompletableFuture<Map<String, String>> scan(final String startKey, final String endKey);
+ CompletableFuture<Map<String, String>> scan(final String startKey, final String endKey, int scanNum);
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgSender.java
similarity index 70%
copy from mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
copy to mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgSender.java
index e9635bf..8a3f5a0 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/facade/WillMsgSender.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.common.meta;
+package org.apache.rocketmq.mqtt.common.facade;
-public class Constants {
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import org.apache.rocketmq.mqtt.common.model.StoreResult;
- public static final String CATEGORY_RETAINED_MSG = "retainedMsg";
- public static final String CATEGORY_WILL_MSG = "willMsg";
+import java.util.concurrent.CompletableFuture;
- public static final String NOT_FOUND = "NOT_FOUND";
-
- public static final String READ_INDEX_TYPE = "readIndexType";
+public interface WillMsgSender {
+ CompletableFuture<StoreResult> sendWillMsg(String clientId, MqttPublishMessage message);
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/MetaConstants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/MetaConstants.java
new file mode 100644
index 0000000..aad01f6
--- /dev/null
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/MetaConstants.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.common.meta;
+
+public class MetaConstants {
+
+ public static final String CATEGORY_RETAINED_MSG = "retainedMsg";
+ public static final String CATEGORY_WILL_MSG = "willMsg";
+ public static final String CATEGORY_HASH_KV = "hashKv";
+
+ public static final String NOT_FOUND = "NOT_FOUND";
+
+ public static final String READ_INDEX_TYPE = "readIndexType";
+ public static final String ANY_READ_TYPE = "anyRead";
+
+ public static final String OP_KV_GET = "get";
+ public static final String OP_KV_GET_HASH = "getHash";
+ public static final String OP_HASH_KV_FIELD = "field";
+ public static final String OP_KV_PUT = "put";
+ public static final String OP_KV_PUT_HASH = "putHash";
+ public static final String OP_KV_DEL = "del";
+ public static final String OP_KV_DEL_HASH = "delHash";
+
+ public static final String RETAIN_REQ_READ_PARAM_TOPIC = "topic";
+ public static final String RETAIN_REQ_READ_PARAM_FIRST_TOPIC = "firstTopic";
+ public static final String RETAIN_REQ_READ_PARAM_OPERATION_TRIE = "trie";
+ public static final String RETAIN_REQ_READ_PARAM_OPERATION_TOPIC = "topic";
+
+ public static final String RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC = "firstTopic";
+ public static final String RETAIN_REQ_WRITE_PARAM_TOPIC = "topic";
+ public static final String RETAIN_REQ_WRITE_PARAM_IS_EMPTY = "isEmpty";
+ public static final String RETAIN_REQ_WRITE_PARAM_EXPIRE = "expire";
+
+ public static final String WILL_REQ_READ_GET = "get";
+ public static final String WILL_REQ_READ_SCAN = "scan";
+ public static final String WILL_REQ_READ_SCAN_NUM = "scanNum";
+ public static final String WILL_REQ_READ_START_KEY = "startKey";
+ public static final String WILL_REQ_READ_END_KEY = "endKey";
+
+ public static final String WILL_REQ_WRITE_PUT = "put";
+ public static final String WILL_REQ_WRITE_DELETE = "delete";
+
+ public static final String WILL_REQ_WRITE_COMPARE_AND_PUT = "compareAndPut";
+ public static final String WILL_REQ_WRITE_EXPECT_VALUE = "expectValue";
+
+
+}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java
index d730684..9c0f603 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/RaftUtil.java
@@ -28,6 +28,7 @@
public static final int RETAIN_RAFT_GROUP_INDEX = 0;
public static final int WILL_RAFT_GROUP_INDEX = 1;
+ public static final int HASH_KV_RAFT_GROUP_INDEX = 2;
static {
RAFT_GROUPS = new String[RAFT_GROUP_NUM];
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
index f6e404c..cd764d7 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java
@@ -23,6 +23,7 @@
public static final String PLUS_SIGN = "+";
public static final String NUMBER_SIGN = "#";
+ public static final String DOLLAR_SIGN = "$";
public static final String COLON = ":";
public static final String P2P = "/p2p/";
@@ -55,5 +56,7 @@
public static final byte CTRL_2 = '\u0002';
public static final String NOT_FOUND = "NOT_FOUND";
+ public static final String SHARED_PREFIX = DOLLAR_SIGN + "share";
+ public static final String EMPTY_SHARE_NAME = "";
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
index e26f780..60d4ba8 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/PullResult.java
@@ -24,6 +24,7 @@
public class PullResult {
public static final int PULL_SUCCESS = 301;
public static final int PULL_OFFSET_MOVED = 302;
+ public static final int NO_NEW_MSG = 303;
private int code = PULL_SUCCESS;
private String remark;
private List<Message> messageList;
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
index c10921f..47b7e69 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Queue.java
@@ -51,6 +51,10 @@
return TopicUtils.isP2pTopic(queueName);
}
+ public boolean isShare() {
+ return TopicUtils.isSharedSubscription(queueName);
+ }
+
public long getQueueId() {
return queueId;
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
index cd2d380..8aa62fb 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Subscription.java
@@ -85,6 +85,17 @@
return topicFilter != null ? topicFilter.hashCode() : 0;
}
+ public boolean isShare() {
+ return TopicUtils.isSharedSubscription(topicFilter);
+ }
+
+ public String getSharedName() {
+ if (!isShare()) {
+ return null;
+ }
+ return TopicUtils.getSharedName(topicFilter);
+ }
+
public String getTopicFilter() {
return topicFilter;
}
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
index 166475f..6f5b40b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
+++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/TopicUtils.java
@@ -116,6 +116,9 @@
if (topics.startsWith(Constants.MQTT_TOPIC_DELIMITER)) {
topics = topics.substring(1);
}
+ if (topics.startsWith(Constants.SHARED_PREFIX)) {
+ topics = TopicUtils.getSharedTopicFilter(topics);
+ }
String topic;
String secondTopic = null;
int index = topics.indexOf(Constants.MQTT_TOPIC_DELIMITER, 1);
@@ -185,4 +188,24 @@
public static String wrapP2pLmq(String clientId) {
return normalizeTopic(Constants.P2P + clientId);
}
+
+ // shared subscription topic filter format: $share/{ShareName}/{filter}
+ public static boolean isSharedSubscription(String topicFilter) {
+ if (StringUtils.isEmpty(topicFilter)) {
+ return false;
+ }
+ if (!topicFilter.startsWith(Constants.SHARED_PREFIX)) {
+ return false;
+ }
+ String[] arr = topicFilter.split(Constants.MQTT_TOPIC_DELIMITER);
+ return arr.length > 2;
+ }
+
+ public static String getSharedName(String topicFilter) {
+ return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER)[1];
+ }
+
+ public static String getSharedTopicFilter(String topicFilter) {
+ return topicFilter.split(Constants.MQTT_TOPIC_DELIMITER, 3)[2];
+ }
}
diff --git a/mqtt-cs/pom.xml b/mqtt-cs/pom.xml
index 015f117..e3924a1 100644
--- a/mqtt-cs/pom.xml
+++ b/mqtt-cs/pom.xml
@@ -35,6 +35,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
index 1b164ad..5299467 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
@@ -17,28 +17,16 @@
package org.apache.rocketmq.mqtt.cs.channel;
-import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import org.apache.commons.lang3.StringUtils;
-
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.hook.HookResult;
-import org.apache.rocketmq.mqtt.common.meta.IpUtil;
-import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
-import org.apache.rocketmq.mqtt.common.model.Constants;
-import org.apache.rocketmq.mqtt.common.model.WillMessage;
-import org.apache.rocketmq.mqtt.common.util.HostInfo;
-import org.apache.rocketmq.mqtt.common.util.MessageUtil;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.session.Session;
import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
-import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
+import org.apache.rocketmq.mqtt.cs.session.loop.WillLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -46,11 +34,8 @@
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
@@ -74,13 +59,7 @@
private MqttMsgId mqttMsgId;
@Resource
- private WillMsgPersistManager willMsgPersistManager;
-
- @Resource
- private PublishProcessor publishProcessor;
-
- private ThreadPoolExecutor executor;
-
+ private WillLoop willLoop;
@PostConstruct
public void init() {
@@ -92,14 +71,6 @@
closeConnect(channel, ChannelCloseFrom.SERVER, "ServerShutdown");
}
}));
-
- executor = new ThreadPoolExecutor(
- 1,
- 1,
- 1,
- TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(5000),
- new ThreadFactoryImpl("DispatchWillToMQ_ "));
}
@Override
@@ -147,50 +118,7 @@
public void closeConnect(Channel channel, ChannelCloseFrom from, String reason) {
String clientId = ChannelInfo.getClientId(channel);
String channelId = ChannelInfo.getId(channel);
- String ip = IpUtil.getLocalAddressCompatible();
-
- String willKey = ip + Constants.CTRL_1 + clientId;
- CompletableFuture<byte[]> willMessageFuture = willMsgPersistManager.get(willKey);
- willMessageFuture.whenComplete((willMessageByte, throwable) -> {
- String content = new String(willMessageByte);
- if (Constants.NOT_FOUND.equals(content)) {
- return;
- }
-
- if (!"disconnect".equals(reason)) {
- WillMessage willMessage = JSON.parseObject(content, WillMessage.class);
-
- int mqttId = mqttMsgId.nextId(clientId);
- MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
- willMessage.getQos(), mqttId, willMessage.isRetain());
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(buildMqttMessageUpContext(channel), mqttMessage);
- upstreamHookResult.whenComplete((hookResult, tb) -> {
- try {
- if (!hookResult.isSuccess()) {
- executor.submit(this);
- } else {
- willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
- if (!resultDel || tbDel != null) {
- logger.error("fail to delete will message key:{}", willKey);
- return;
- }
- logger.info("connection close and send will, delete will message key {} successfully", willKey);
- });
- }
- } catch (Throwable t) {
- logger.error("", t);
- }
- });
- }
- };
- executor.submit(runnable);
- }
- });
-
+ willLoop.closeConnect(channel, clientId, reason);
if (clientId == null) {
channelMap.remove(channelId);
sessionLoop.unloadSession(clientId, channelId);
@@ -208,15 +136,6 @@
logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason);
}
- private MqttMessageUpContext buildMqttMessageUpContext(Channel channel) {
- MqttMessageUpContext context = new MqttMessageUpContext();
- context.setClientId(ChannelInfo.getClientId(channel));
- context.setChannelId(ChannelInfo.getId(channel));
- context.setNode(HostInfo.getInstall().getAddress());
- context.setNamespace(ChannelInfo.getNamespace(channel));
- return context;
- }
-
@Override
public void closeConnect(String channelId, String reason) {
Channel channel = channelMap.get(channelId);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/WillLoopConf.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/WillLoopConf.java
new file mode 100644
index 0000000..c05292e
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/WillLoopConf.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.config;
+
+public class WillLoopConf {
+ private int maxScanNodeNum = 100;
+ private int maxScanClientNum = 100000;
+ private int scanNumOnce = 100;
+ private boolean enableWill = true;
+
+ public int getMaxScanNodeNum() {
+ return maxScanNodeNum;
+ }
+
+ public void setMaxScanNodeNum(int maxScanNodeNum) {
+ this.maxScanNodeNum = maxScanNodeNum;
+ }
+
+ public int getMaxScanClientNum() {
+ return maxScanClientNum;
+ }
+
+ public void setMaxScanClientNum(int maxScanClientNum) {
+ this.maxScanClientNum = maxScanClientNum;
+ }
+
+ public int getScanNumOnce() {
+ return scanNumOnce;
+ }
+
+ public void setScanNumOnce(int scanNumOnce) {
+ this.scanNumOnce = scanNumOnce;
+ }
+
+ public boolean isEnableWill() {
+ return enableWill;
+ }
+
+ public void setEnableWill(boolean enableWill) {
+ this.enableWill = enableWill;
+ }
+}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
index 0fa60e1..247ecf9 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttConnectHandler.java
@@ -34,6 +34,7 @@
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketHandler;
import org.apache.rocketmq.mqtt.cs.protocol.mqtt.facotry.MqttMessageFactory;
import org.apache.rocketmq.mqtt.cs.session.loop.SessionLoop;
+import org.apache.rocketmq.mqtt.cs.session.loop.WillLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -54,6 +55,9 @@
@Resource
private SessionLoop sessionLoop;
+ @Resource
+ private WillLoop willLoop;
+
private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("check_connect_future"));
@Override
@@ -116,7 +120,7 @@
}
willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());
- sessionLoop.addWillMessage(channel, willMessage);
+ willLoop.addWillMessage(channel, willMessage);
}
} catch (Exception e) {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
index 19ffac4..fb6ecf7 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java
@@ -144,7 +144,6 @@
private void sendRetainMessage(ChannelHandlerContext ctx, Set<Subscription> subscriptions) throws InterruptedException, RemotingException, org.apache.rocketmq.remoting.exception.RemotingException {
-
String clientId = ChannelInfo.getClientId(ctx.channel());
Session session = sessionLoop.getSession(ChannelInfo.getId(ctx.channel()));
Set<Subscription> preciseTopics = new HashSet<>();
@@ -169,8 +168,7 @@
}
for (Subscription subscription : wildcardTopics) {
-
- CompletableFuture<ArrayList<Message>> future = retainedPersistManager.getMsgsFromTrie(subscription);
+ CompletableFuture<ArrayList<Message>> future = retainedPersistManager.getMsgsFromTrie(subscription.toFirstTopic(), subscription.getTopicFilter());
future.whenComplete((msgsList, throwable) -> {
for (Message msg : msgsList) {
if (msg == null) {
@@ -179,7 +177,6 @@
pushAction._sendMessage(session, clientId, subscription, msg);
}
});
-
}
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
index 218b460..9d656f1 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java
@@ -290,6 +290,9 @@
if (!subscriptions.containsKey(subscription.getTopicFilter())) {
return false;
}
+ if (subscription.isShare()) {
+ return true;
+ }
if (!sendingMessages.containsKey(subscription)) {
sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index 15d4ed2..70592a3 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -21,6 +21,8 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.Subscription;
@@ -56,6 +58,8 @@
@Resource
private ConnectConf connectConf;
+ @Resource
+ private LmqQueueStore lmqQueueStore;
public void messageArrive(Session session, Subscription subscription, Queue queue) {
if (session == null) {
@@ -163,6 +167,9 @@
if (subscription.isRetry()) {
message.setRetry(message.getRetry() + 1);
logger.warn("retryPush:{},{},{}", session.getClientId(), message.getMsgId(), message.getRetry());
+ } else if (subscription.isShare()) {
+ String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/","%");
+ lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), message);
}
});
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index 89c6a71..771d09f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -111,7 +111,7 @@
if (queue == null) {
return;
}
- if (queue.isP2p() || queue.isRetry()) {
+ if (queue.isP2p() || queue.isRetry() || queue.isShare()) {
return;
}
addLoadEvent(queue, pair.getRight());
@@ -188,6 +188,13 @@
callbackResult(pullResult, callBackResult);
return DONE;
}
+
+ if (subscription.isShare()) {
+ CompletableFuture<PullResult> pullResult = lmqQueueStore.popMessage(subscription.getSharedName(), toFirstTopic(subscription), queue, count);
+ callbackResult(pullResult, callBackResult);
+ return DONE;
+ }
+
CacheEntry cacheEntry = cache.getIfPresent(queue);
if (cacheEntry == null) {
StatUtil.addPv("NoPullCache", 1);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java
index 3b969e8..65f995f 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoop.java
@@ -21,7 +21,6 @@
import io.netty.channel.Channel;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.Subscription;
-import org.apache.rocketmq.mqtt.common.model.WillMessage;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
import org.apache.rocketmq.mqtt.cs.session.Session;
@@ -96,5 +95,4 @@
*/
void notifyPullMessage(Session session, Subscription subscription, Queue queue);
- void addWillMessage(Channel channel, WillMessage willMessage);
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index 0c8bc17..ad62abc 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.mqtt.cs.session.loop;
-import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.commons.lang3.StringUtils;
@@ -26,13 +25,10 @@
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.meta.IpUtil;
-import org.apache.rocketmq.mqtt.common.model.Constants;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
-import org.apache.rocketmq.mqtt.common.model.WillMessage;
import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
@@ -393,6 +389,7 @@
throw new RuntimeException(
"invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
}
+ logger.info("session loop impl doing notifyPullMessage queueFresh.freshQueue({}, {}})", session, subscription);
queueFresh.freshQueue(session, subscription);
pullMessage(session, subscription, queue);
return;
@@ -411,32 +408,6 @@
}
}
- @Override
- public void addWillMessage(Channel channel, WillMessage willMessage) {
- Session session = getSession(ChannelInfo.getId(channel));
- String clientId = ChannelInfo.getClientId(channel);
- String ip = IpUtil.getLocalAddressCompatible();
-
- if (session == null) {
- return;
- }
- if (willMessage == null) {
- return;
- }
-
- String message = JSON.toJSONString(willMessage);
- String willKey = ip + Constants.CTRL_1 + clientId;
-
- // key: ip + clientId; value: WillMessage
- willMsgPersistManager.put(willKey, message).whenComplete((result, throwable) -> {
- if (!result || throwable != null) {
- logger.error("fail to put will message key {} value {}", willKey, willMessage);
- return;
- }
- logger.debug("put will message key {} value {} successfully", willKey, message);
- });
- }
-
private String eventQueueKey(Session session, Queue queue) {
StringBuilder sb = new StringBuilder();
sb.append(ChannelInfo.getId(session.getChannel()));
@@ -540,7 +511,6 @@
}
} else if (PullResult.PULL_OFFSET_MOVED == pullResult.getCode()) {
queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
- queueOffset.setOffset(pullResult.getNextQueueOffset().getOffset());
session.markPersistOffsetFlag(true);
pullMessage(session, subscription, queue);
} else {
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
index e27bb47..a1dd337 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
@@ -18,17 +18,19 @@
package org.apache.rocketmq.mqtt.cs.session.loop;
import com.alibaba.fastjson.JSON;
+import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.facade.WillMsgSender;
import org.apache.rocketmq.mqtt.common.meta.IpUtil;
import org.apache.rocketmq.mqtt.common.model.Constants;
-import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.common.model.StoreResult;
import org.apache.rocketmq.mqtt.common.model.WillMessage;
import org.apache.rocketmq.mqtt.common.util.MessageUtil;
+import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
+import org.apache.rocketmq.mqtt.cs.config.WillLoopConf;
import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
-import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -42,6 +44,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
@Component
public class WillLoop {
@@ -49,6 +52,7 @@
private ScheduledThreadPoolExecutor aliveService = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("check_alive_thread_"));
private long checkAliveIntervalMillis = 5 * 1000;
private ThreadPoolExecutor executor;
+ private WillLoopConf willLoopConf = new WillLoopConf();
@Resource
private WillMsgPersistManager willMsgPersistManager;
@@ -57,7 +61,11 @@
private MqttMsgId mqttMsgId;
@Resource
- private PublishProcessor publishProcessor;
+ private WillMsgSender willMsgSender;
+
+ public void setWillLoopConf(WillLoopConf willLoopConf) {
+ this.willLoopConf = willLoopConf;
+ }
@PostConstruct
public void init() {
@@ -75,9 +83,12 @@
private void csLoop() {
try {
+ if (!willLoopConf.isEnableWill()) {
+ return;
+ }
String ip = IpUtil.getLocalAddressCompatible();
- String csKey = Constants.CS_ALIVE + Constants.CTRL_1 + ip;
- String masterKey = Constants.CS_MASTER;
+ String csKey = wrapAliveCsKeyPrefix() + Constants.CTRL_1 + ip;
+ String masterKey = wrapMasterKey();
long currentTime = System.currentTimeMillis();
willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
@@ -103,6 +114,14 @@
}
}
+ protected String wrapMasterKey() {
+ return Constants.CS_MASTER;
+ }
+
+ protected String wrapAliveCsKeyPrefix() {
+ return Constants.CS_ALIVE;
+ }
+
private boolean masterHasDown(String masterValue) {
String[] ipTime = masterValue.split(Constants.COLON);
if (ipTime.length < 2) {
@@ -115,13 +134,16 @@
private void masterLoop() {
try {
+ if (!willLoopConf.isEnableWill()) {
+ return;
+ }
String ip = IpUtil.getLocalAddressCompatible();
if (ip == null) {
logger.error("can not get local ip");
return;
}
- willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
+ willMsgPersistManager.get(wrapMasterKey()).whenComplete((result, throwable) -> {
if (result == null || throwable != null) {
logger.error("fail to get CS_MASTER", throwable);
return;
@@ -139,16 +161,16 @@
}
// master keep alive
long currentTime = System.currentTimeMillis();
- willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
+ willMsgPersistManager.compareAndPut(wrapMasterKey(), content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
if (!rs || tb != null) {
logger.error("{} fail to update master", ip, tb);
}
});
// master to check all cs state
- String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
- String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
- willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
+ String startCSKey = wrapAliveCsKeyPrefix() + Constants.CTRL_0;
+ String endCSKey = wrapAliveCsKeyPrefix() + Constants.CTRL_2;
+ willMsgPersistManager.scan(startCSKey, endCSKey, willLoopConf.getMaxScanNodeNum()).whenComplete((rs, tb) -> {
if (rs == null || tb != null) {
logger.error("{} master fail to scan cs", ip, tb);
return;
@@ -162,10 +184,10 @@
for (Map.Entry<String, String> entry : rs.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
- logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
+ logger.debug("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
// the cs has down
- String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
+ String csIp = key.substring((wrapAliveCsKeyPrefix() + Constants.CTRL_1).length());
handleShutDownCS(csIp);
willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
@@ -185,53 +207,100 @@
}
private void handleShutDownCS(String ip) {
+ AtomicInteger loopNum = new AtomicInteger(
+ Math.max(1, willLoopConf.getMaxScanClientNum()) / willLoopConf.getScanNumOnce() + 1);
+ _handleShutDownCS(ip, loopNum);
+ }
+
+ private void _handleShutDownCS(String ip, AtomicInteger loopNum) {
+ if (loopNum.getAndDecrement() <= 0) {
+ return;
+ }
String startClientKey = ip + Constants.CTRL_0;
String endClientKey = ip + Constants.CTRL_2;
- willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
- if (willMap == null || throwable != null) {
- logger.error("{} master fail to scan cs", ip, throwable);
- return;
- }
-
- if (willMap.size() == 0) {
- logger.info("the cs:{} has no will", ip);
- return;
- }
-
- for (Map.Entry<String, String> entry : willMap.entrySet()) {
- logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
- String willKey = entry.getKey();
- String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
-
- WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
- int mqttId = mqttMsgId.nextId(clientId);
- MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
- willMessage.getQos(), mqttId, willMessage.isRetain());
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
- upstreamHookResult.whenComplete((hookResult, tb) -> {
- try {
- if (!hookResult.isSuccess()) {
- executor.submit(this);
- } else {
- willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
- if (!resultDel || tbDel != null) {
- logger.error("fail to delete will message key:{}", willKey);
- return;
- }
- logger.info("delete will message key {} successfully", willKey);
- });
- }
- } catch (Throwable t) {
- logger.error("", t);
- }
- });
+ willMsgPersistManager
+ .scan(startClientKey, endClientKey, willLoopConf.getScanNumOnce())
+ .whenComplete((willMap, throwable) -> {
+ if (willMap == null || throwable != null) {
+ logger.error("{} master fail to scan cs", ip, throwable);
+ return;
}
- };
- executor.submit(runnable);
+ for (Map.Entry<String, String> entry : willMap.entrySet()) {
+ logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
+ String willKey = entry.getKey();
+ String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
+ WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
+ sendWillMessage(willKey, clientId, willMessage);
+ }
+ if (willMap.size() >= willLoopConf.getScanNumOnce()) {
+ _handleShutDownCS(ip, loopNum);
+ }
+ });
+ }
+
+ public void closeConnect(Channel channel, String clientId, String reason) {
+ String ip = IpUtil.getLocalAddressCompatible();
+ String willKey = ip + Constants.CTRL_1 + clientId;
+ CompletableFuture<byte[]> willMessageFuture = willMsgPersistManager.get(willKey);
+ willMessageFuture.whenComplete((willMessageByte, throwable) -> {
+ String content = new String(willMessageByte);
+ if (Constants.NOT_FOUND.equals(content)) {
+ return;
}
+
+ if (!"disconnect".equals(reason)) {
+ WillMessage willMessage = JSON.parseObject(content, WillMessage.class);
+ sendWillMessage(willKey, clientId, willMessage);
+ }
+ });
+ }
+
+ private void sendWillMessage(String willKey, String clientId, WillMessage willMessage) {
+ int mqttId = mqttMsgId.nextId(clientId);
+ MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
+ willMessage.getQos(), mqttId, willMessage.isRetain());
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ CompletableFuture<StoreResult> r = willMsgSender.sendWillMsg(clientId, mqttMessage);
+ r.whenComplete((hookResult, tb) -> {
+ try {
+ if (tb != null) {
+ logger.error("sendWillMsg failed {},{}", clientId, willKey, tb);
+ } else {
+ willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
+ if (!resultDel || tbDel != null) {
+ logger.error("fail to delete will message key:{}", willKey);
+ return;
+ }
+ logger.info("delete will message key {} successfully", willKey);
+ });
+ }
+ } catch (Throwable t) {
+ logger.error("", t);
+ }
+ });
+ }
+ };
+ executor.submit(runnable);
+ }
+
+ public void addWillMessage(Channel channel, WillMessage willMessage) {
+ String clientId = ChannelInfo.getClientId(channel);
+ String ip = IpUtil.getLocalAddressCompatible();
+ if (willMessage == null) {
+ return;
+ }
+ String message = JSON.toJSONString(willMessage);
+ String willKey = ip + Constants.CTRL_1 + clientId;
+
+ // key: ip + clientId; value: WillMessage
+ willMsgPersistManager.put(willKey, message).whenComplete((result, throwable) -> {
+ if (!result || throwable != null) {
+ logger.error("fail to put will message key {} value {}", willKey, willMessage);
+ return;
+ }
+ logger.debug("put will message key {} value {} successfully", willKey, message);
});
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
index 7fa680f..b861fda 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/Startup.java
@@ -17,15 +17,13 @@
package org.apache.rocketmq.mqtt.cs.starter;
-import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.mqtt.common.util.SpringUtils;
+
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Startup {
-
public static void main(String[] args) {
- System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring.xml");
SpringUtils.SetClassPathXmlApplicationContext(applicationContext);
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java
index 536bd87..337a3bc 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/config/ServiceConf.java
@@ -22,6 +22,7 @@
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -43,8 +44,10 @@
private String secretKey;
private String metaAddr;
+ private long retainMsgExpire = 3 * 24 * 60 * 60 * 1000L;
- public ServiceConf() throws IOException {
+ @PostConstruct
+ public void init() throws IOException {
ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
InputStream in = classPathResource.getInputStream();
Properties properties = new Properties();
@@ -148,4 +151,11 @@
this.secretKey = secretKey;
}
+ public long getRetainMsgExpire() {
+ return retainMsgExpire;
+ }
+
+ public void setRetainMsgExpire(long retainMsgExpire) {
+ this.retainMsgExpire = retainMsgExpire;
+ }
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java
index 79e7047..c976f1f 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/FirstTopicManager.java
@@ -24,13 +24,13 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
index 95368ce..9fe2695 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
@@ -20,12 +20,12 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java
index 5814d44..e32e735 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaRpcClient.java
@@ -48,11 +48,11 @@
@Component
public class MetaRpcClient {
private static Logger logger = LoggerFactory.getLogger(MetaRpcClient.class);
+ private static ScheduledExecutorService raftClientExecutor = Executors.newSingleThreadScheduledExecutor();
private RouteTable rt;
private Configuration conf;
private CliClientServiceImpl cliClientService;
- private static ScheduledExecutorService raftClientExecutor = Executors.newSingleThreadScheduledExecutor();
- public String[] raftGroups;
+ private String[] raftGroups;
@Resource
private ServiceConf serviceConf;
@@ -68,8 +68,7 @@
for (String groupId : raftGroups) {
rt.updateConfiguration(groupId, conf);
}
- refreshLeader();
- raftClientExecutor.scheduleAtFixedRate(() -> refreshLeader(), 3, 3, TimeUnit.SECONDS);
+ raftClientExecutor.scheduleAtFixedRate(() -> refreshLeader(), 1, 3, TimeUnit.SECONDS);
}
public void initRpcServer() {
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java
index eef7f82..833084c 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedMsgClient.java
@@ -21,11 +21,15 @@
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.Unpooled;
+import io.netty.util.CharsetUtil;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.MessageUtil;
+import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +42,17 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_RETAINED_MSG;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.NOT_FOUND;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_RETAINED_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.NOT_FOUND;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_OPERATION_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_OPERATION_TRIE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_IS_EMPTY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_EXPIRE;
import static org.apache.rocketmq.mqtt.common.meta.RaftUtil.RETAIN_RAFT_GROUP_INDEX;
@@ -49,17 +61,24 @@
private static Logger logger = LoggerFactory.getLogger(RetainedMsgClient.class);
@Resource
- private ServiceConf serviceConf;
-
- @Resource
private MetaRpcClient metaRpcClient;
+ @Resource
+ public ServiceConf serviceConf;
+
public void setRetainedMsg(String topic, Message msg, CompletableFuture<Boolean> future) throws RemotingException, InterruptedException {
+ _setRetainedMsg(topic, msg, null, future);
+ }
+
+ public void _setRetainedMsg(String topic, Message msg, Long expire, CompletableFuture<Boolean> future) throws RemotingException, InterruptedException {
String groupId = whichGroup();
HashMap<String, String> option = new HashMap<>();
- option.put("topic", topic);
- option.put("firstTopic", msg.getFirstTopic());
- option.put("isEmpty", String.valueOf(msg.isEmpty()));
+ option.put(RETAIN_REQ_WRITE_PARAM_TOPIC, topic);
+ option.put(RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC, msg.getFirstTopic());
+ option.put(RETAIN_REQ_WRITE_PARAM_IS_EMPTY, String.valueOf(msg.isEmpty()));
+ if (expire != null) {
+ option.put(RETAIN_REQ_WRITE_PARAM_EXPIRE, String.valueOf(expire));
+ }
logger.debug("SetRetainedMsg option:" + option);
@@ -99,14 +118,14 @@
String groupId = whichGroup();
HashMap<String, String> option = new HashMap<>();
- option.put("firstTopic", firstTopic);
- option.put("topic", topic);
+ option.put(RETAIN_REQ_READ_PARAM_FIRST_TOPIC, firstTopic);
+ option.put(RETAIN_REQ_READ_PARAM_TOPIC, topic);
logger.debug("GetRetainedMsgsFromTrie option:" + option);
final ReadRequest request = ReadRequest.newBuilder()
.setGroup(groupId)
- .setOperation("trie")
+ .setOperation(RETAIN_REQ_READ_PARAM_OPERATION_TRIE)
.setType(READ_INDEX_TYPE)
.putAllExtData(option)
.setCategory(CATEGORY_RETAINED_MSG)
@@ -126,7 +145,18 @@
ArrayList<Message> resultList = new ArrayList<>();
for (ByteString tmp : datalistList) {
try {
- resultList.add(Message.copyFromStoreMessage(StoreMessage.parseFrom(tmp.toByteArray())));
+ Message message = Message.copyFromStoreMessage(StoreMessage.parseFrom(tmp.toByteArray()));
+ if (System.currentTimeMillis() - message.getBornTimestamp() > serviceConf.getRetainMsgExpire()) {
+ message.setPayload(Unpooled.copiedBuffer(MessageUtil.EMPTYSTRING, CharsetUtil.UTF_8).array());
+ message.setEmpty(true);
+ try {
+ _setRetainedMsg(TopicUtils.normalizeTopic(message.getOriginTopic()), message, serviceConf.getRetainMsgExpire(), new CompletableFuture<>());
+ } catch (Exception e) {
+ logger.error("", e);
+ }
+ continue;
+ }
+ resultList.add(message);
} catch (InvalidProtocolBufferException e) {
future.complete(null);
throw new RuntimeException(e);
@@ -149,11 +179,11 @@
public void GetRetainedMsg(String topic, CompletableFuture<Message> future) throws RemotingException, InterruptedException {
String groupId = whichGroup();
HashMap<String, String> option = new HashMap<>();
- option.put("topic", topic);
+ option.put(RETAIN_REQ_READ_PARAM_TOPIC, topic);
final ReadRequest request = ReadRequest.newBuilder()
.setGroup(groupId)
- .setOperation("topic")
+ .setOperation(RETAIN_REQ_READ_PARAM_OPERATION_TOPIC)
.setType(READ_INDEX_TYPE)
.putAllExtData(option)
.setCategory(CATEGORY_RETAINED_MSG)
@@ -177,7 +207,17 @@
Message message = null;
try {
message = Message.copyFromStoreMessage(StoreMessage.parseFrom(rsp.getData().toByteArray()));
- } catch (InvalidProtocolBufferException e) {
+ if (System.currentTimeMillis() - message.getBornTimestamp() > serviceConf.getRetainMsgExpire()) {
+ message.setPayload(Unpooled.copiedBuffer(MessageUtil.EMPTYSTRING, CharsetUtil.UTF_8).array());
+ message.setEmpty(true);
+ try {
+ _setRetainedMsg(topic, message, serviceConf.getRetainMsgExpire(), new CompletableFuture<>());
+ } catch (Exception e) {
+ logger.error("", e);
+ }
+ message = null;
+ }
+ } catch (Exception e) {
future.complete(null);
throw new RuntimeException(e);
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java
index a2e0901..87c797d 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/RetainedPersistManagerImpl.java
@@ -20,19 +20,14 @@
import com.alipay.sofa.jraft.error.RemotingException;
-
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager;
import org.apache.rocketmq.mqtt.common.model.Message;
-import org.apache.rocketmq.mqtt.common.model.Subscription;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import java.util.ArrayList;
-
-
import java.util.concurrent.CompletableFuture;
@@ -46,6 +41,7 @@
@Resource
private RetainedMsgClient retainedMsgClient;
+
public void init() {
}
@@ -81,11 +77,8 @@
return future;
}
- public CompletableFuture<ArrayList<Message>> getMsgsFromTrie(Subscription subscription) {
- String firstTopic = subscription.toFirstTopic();
- String originTopicFilter = subscription.getTopicFilter();
+ public CompletableFuture<ArrayList<Message>> getMsgsFromTrie(String firstTopic, String originTopicFilter) {
logger.debug("firstTopic={} originTopicFilter={}", firstTopic, originTopicFilter);
-
CompletableFuture<ArrayList<Message>> future = new CompletableFuture<>();
try {
retainedMsgClient.GetRetainedMsgsFromTrie(firstTopic, originTopicFilter, future);
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java
index 207b461..7b0412e 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgClient.java
@@ -29,8 +29,17 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_WILL_MSG;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_WILL_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.READ_INDEX_TYPE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_END_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_GET;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN_NUM;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_START_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_COMPARE_AND_PUT;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_DELETE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_EXPECT_VALUE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_PUT;
import static org.apache.rocketmq.mqtt.common.meta.RaftUtil.WILL_RAFT_GROUP_INDEX;
@@ -48,7 +57,7 @@
setGroup(groupId).
setKey(key).
setData(ByteString.copyFrom(value.getBytes())).
- setOperation("put").
+ setOperation(WILL_REQ_WRITE_PUT).
setCategory(CATEGORY_WILL_MSG).
build();
@@ -74,7 +83,7 @@
final WriteRequest request = WriteRequest.newBuilder().
setGroup(groupId).
setKey(key).
- setOperation("delete").
+ setOperation(WILL_REQ_WRITE_DELETE).
setCategory(CATEGORY_WILL_MSG).
build();
@@ -100,7 +109,7 @@
final ReadRequest request = ReadRequest.newBuilder().
setGroup(groupId).
setKey(key).
- setOperation("get").
+ setOperation(WILL_REQ_READ_GET).
setType(READ_INDEX_TYPE).
setCategory(CATEGORY_WILL_MSG).
build();
@@ -127,8 +136,8 @@
setGroup(groupId).
setKey(key).
setData(ByteString.copyFrom(updateValue.getBytes())).
- setOperation("compareAndPut").
- putExtData("expectValue", expectValue).
+ setOperation(WILL_REQ_WRITE_COMPARE_AND_PUT).
+ putExtData(WILL_REQ_WRITE_EXPECT_VALUE, expectValue).
setCategory(CATEGORY_WILL_MSG).
build();
@@ -149,13 +158,14 @@
}, 5000);
}
- public void scan(final String startKey, final String endKey, CompletableFuture<Map<String, String>> future) throws Exception {
+ public void scan(final String startKey, final String endKey, int scanNum, CompletableFuture<Map<String, String>> future) throws Exception {
String groupId = whichGroup();
final ReadRequest request = ReadRequest.newBuilder().
setGroup(groupId).
- setOperation("scan").
- putExtData("startKey", startKey).
- putExtData("endKey", endKey).
+ setOperation(WILL_REQ_READ_SCAN).
+ putExtData(WILL_REQ_READ_START_KEY, startKey).
+ putExtData(WILL_REQ_READ_END_KEY, endKey).
+ putExtData(WILL_REQ_READ_SCAN_NUM, String.valueOf(scanNum)).
setType(READ_INDEX_TYPE).
setCategory(CATEGORY_WILL_MSG).
build();
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java
index 4cb5c03..504ed8b 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/WillMsgPersistManagerImpl.java
@@ -91,10 +91,10 @@
}
@Override
- public CompletableFuture<Map<String, String>> scan(String startKey, String endKey) {
+ public CompletableFuture<Map<String, String>> scan(String startKey, String endKey, int scanNum) {
CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
try {
- willMsgClient.scan(startKey, endKey, future);
+ willMsgClient.scan(startKey, endKey, scanNum, future);
return future;
} catch (Exception e) {
future.completeExceptionally(e);
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index a6327b8..e027fa5 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -117,6 +117,8 @@
private void refresh() throws MQClientException {
Set<String> tmp = metaPersistManager.getAllFirstTopics();
+ logger.info("Notify Manager is refreshing, all first topic is " + tmp);
+
if (tmp == null || tmp.isEmpty()) {
return;
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
index e9f87e6..094150c 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.java
@@ -22,9 +22,6 @@
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
@@ -32,6 +29,9 @@
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 7511fdd..49fe87e 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -20,7 +20,12 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -34,13 +39,12 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.model.Constants;
@@ -58,6 +62,11 @@
import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -70,12 +79,14 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component
public class LmqQueueStoreManager implements LmqQueueStore {
private static Logger logger = LoggerFactory.getLogger(LmqQueueStoreManager.class);
private PullAPIWrapper pullAPIWrapper;
+ private MQClientInstance mQClientFactory;
private DefaultMQPullConsumer defaultMQPullConsumer;
private DefaultMQProducer defaultMQProducer;
private String consumerGroup = MixAll.CID_RMQ_SYS_PREFIX + "LMQ_PULL";
@@ -93,6 +104,7 @@
defaultMQPullConsumer.setConsumerPullTimeoutMillis(2000);
defaultMQPullConsumer.start();
pullAPIWrapper = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
+ mQClientFactory = defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory();
defaultMQProducer = MqFactory.buildDefaultMQProducer("GID_LMQ_SEND", serviceConf.getProperties());
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
@@ -155,6 +167,11 @@
new TypeReference<Map<String, String>>() {
}));
}
+
+ if (StringUtils.isNotBlank(mqMessage.getProperty(MessageConst.PROPERTY_POP_CK))) {
+ message.getUserProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, mqMessage.getProperty(MessageConst.PROPERTY_POP_CK));
+ }
+
return message;
}
@@ -445,7 +462,8 @@
if (brokerAddr != null) {
try {
- return mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, lmqTopic, (int) queue.getQueueId(), 3000L);
+ MessageQueue messageQueue = new MessageQueue(lmqTopic, queue.getBrokerName(), (int) queue.getQueueId());
+ return mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, messageQueue, 3000L);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
@@ -453,4 +471,169 @@
throw new MQClientException("The broker[" + queue.getBrokerName() + "] not exist", null);
}
+
+ @Override
+ public CompletableFuture<PullResult> popMessage(String consumerGroup, String firstTopic, Queue queue, long count) {
+ CompletableFuture<PullResult> result = new CompletableFuture<>();
+ long start = System.currentTimeMillis();
+ PopCallback popCallback = new PopCallback() {
+ @Override
+ public void onSuccess(PopResult popResult) {
+ if (popResult == null) {
+ logger.warn("PopResult is null, just wait retry");
+ return;
+ }
+
+ PullResult lmqPullResult = new PullResult();
+ if (PopStatus.FOUND == popResult.getPopStatus()) {
+ lmqPullResult.setCode(PullResult.PULL_SUCCESS);
+ List<MessageExt> messageExtList = popResult.getMsgFoundList();
+ if (messageExtList != null && !messageExtList.isEmpty()) {
+ List<Message> messageList = messageExtList.stream()
+ .map(messageExt -> toLmqMessage(queue, messageExt))
+ .collect(Collectors.toList());
+ lmqPullResult.setMessageList(messageList);
+ }
+ } else {
+ lmqPullResult.setCode(PullResult.NO_NEW_MSG);
+ }
+
+ result.complete(lmqPullResult);
+
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPop", rt);
+ collectLmqReadWriteMatchActionRt("lmqPop", rt, true);
+ StatUtil.addPv(popResult.getPopStatus().name(), 1);
+ try {
+ MqttMetricsCollector.collectPullStatusTps(1, popResult.getPopStatus().name());
+ } catch (Throwable e) {
+ logger.error("collect prometheus error", e);
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ logger.error("pop message from {} error", queue, e);
+ result.completeExceptionally(e);
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPop", rt, false);
+ collectLmqReadWriteMatchActionRt("lmqPop", rt, false);
+ }
+ };
+
+ try {
+ String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(queue.getQueueName(), "/","%");
+ MessageQueue firstTopicQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId());
+ popKernelImpl(lmqTopic, firstTopicQueue, consumerGroup, ExpressionType.TAG, "*",
+ 60000, count, ConsumeInitMode.MAX, false, 15000, 15000, popCallback);
+ } catch (Throwable e) {
+ result.completeExceptionally(e);
+ }
+
+ return result;
+ }
+
+ public void popKernelImpl(
+ final String lmqTopic,
+ final MessageQueue mq,
+ final String consumerGroup,
+ final String expressionType,
+ final String subExpression,
+ final long invisibleTime,
+ final long maxNums,
+ final int initMode,
+ final boolean order,
+ final long pollTime,
+ final long timeoutMillis,
+ final PopCallback popCallback
+ ) throws RemotingException, InterruptedException, MQClientException {
+ FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+ }
+ }
+
+ PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setTopic(lmqTopic);
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setMaxMsgNums((int) maxNums);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setInitMode(initMode);
+ requestHeader.setExpType(expressionType);
+ requestHeader.setExp(subExpression);
+ requestHeader.setOrder(order);
+ requestHeader.setPollTime(pollTime);
+ requestHeader.setBornTime(System.currentTimeMillis());
+
+ mQClientFactory.getMQClientAPIImpl().popMessageAsync(mq.getBrokerName(), findBrokerResult.getBrokerAddr(),
+ requestHeader, timeoutMillis, popCallback);
+ }
+
+ public void popAck(String lmqTopic, String consumerGroup, Message message) {
+ long start = System.currentTimeMillis();
+ AckCallback ackCallback = new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPopAck", rt);
+ collectLmqReadWriteMatchActionRt("lmqPopAck", rt, true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, this);
+ } catch (Exception ignore) {
+ logger.warn("popAck {} message error", lmqTopic, e);
+ }
+ long rt = System.currentTimeMillis() - start;
+ StatUtil.addInvoke("lmqPopAck", rt, false);
+ collectLmqReadWriteMatchActionRt("lmqPopAck", rt, false);
+ }
+ };
+
+ try {
+ popAckKernelImpl(lmqTopic, consumerGroup, message, 15000, ackCallback);
+ } catch (Exception e) {
+ logger.error("pop ack error", e);
+ }
+ }
+
+ public void popAckKernelImpl(
+ final String lmqTopic,
+ final String consumerGroup,
+ final Message message,
+ final long timeoutMillis,
+ final AckCallback ackCallback
+ ) throws RemotingException, MQBrokerException, MQClientException, InterruptedException {
+ String extraInfo = message.getUserProperty(MessageConst.PROPERTY_POP_CK);
+ String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+
+ String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+ FindBrokerResult findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ mQClientFactory.updateTopicRouteInfoFromNameServer(message.getFirstTopic());
+ findBrokerResult = mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
+ if (null == findBrokerResult) {
+ throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+ }
+ }
+
+ int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+ long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
+
+ AckMessageRequestHeader ackMessageRequestHeader = new AckMessageRequestHeader();
+ ackMessageRequestHeader.setTopic(lmqTopic);
+ ackMessageRequestHeader.setQueueId(queueId);
+ ackMessageRequestHeader.setOffset(queueOffset);
+ ackMessageRequestHeader.setConsumerGroup(consumerGroup);
+ ackMessageRequestHeader.setExtraInfo(extraInfo);
+
+ mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), timeoutMillis, ackCallback, ackMessageRequestHeader);
+ }
}
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
index 0dec776..8de2726 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/processor/PublishProcessor.java
@@ -25,6 +25,7 @@
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.facade.RetainedPersistManager;
+import org.apache.rocketmq.mqtt.common.facade.WillMsgSender;
import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
@@ -38,13 +39,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@Component
-public class PublishProcessor implements UpstreamProcessor {
+public class PublishProcessor implements UpstreamProcessor, WillMsgSender {
private static Logger logger = LoggerFactory.getLogger(PublishProcessor.class);
@Resource
private LmqQueueStore lmqQueueStore;
@@ -60,6 +62,12 @@
@Override
public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttMessage mqttMessage) {
+ CompletableFuture<StoreResult> r = put(context, mqttMessage);
+ return r.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
+ JSON.toJSONBytes(storeResult)));
+ }
+
+ public CompletableFuture<StoreResult> put(MqttMessageUpContext context, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
boolean isEmpty = false;
//deal empty payload
@@ -98,9 +106,14 @@
message.setBornTimestamp(bornTime);
message.setEmpty(isEmpty);
- CompletableFuture<StoreResult> storeResultFuture = lmqQueueStore.putMessage(queueNames, message);
- return storeResultFuture.thenCompose(storeResult -> HookResult.newHookResult(HookResult.SUCCESS, null,
- JSON.toJSONBytes(storeResult)));
+ return lmqQueueStore.putMessage(queueNames, message);
}
+
+ @Override
+ public CompletableFuture<StoreResult> sendWillMsg(String clientId, MqttPublishMessage message) {
+ MqttMessageUpContext ctx = new MqttMessageUpContext();
+ ctx.setClientId(clientId);
+ return put(ctx, message);
+ }
}
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java
index d638e97..db0011f 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestFirstTopicManager.java
@@ -25,14 +25,14 @@
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.mqtt.common.facade.MetaPersistManager;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.Assert;
import org.junit.Before;
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java
index 2e1f3a8..038b4d4 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/WillMsgPersistManagerImplTest.java
@@ -110,7 +110,7 @@
String ip = "xxxx";
String startClientKey = ip + Constants.CTRL_0;
String endClientKey = ip + Constants.CTRL_2;
- willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
+ willMsgPersistManager.scan(startClientKey, endClientKey, 100).whenComplete((willMap, throwable) -> {
if (willMap == null || throwable != null) {
return;
}
diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
index a82c593..5ec114e 100644
--- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
+++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/store/TestLmqQueueStoreManager.java
@@ -37,6 +37,7 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.mqtt.common.model.Constants;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.model.MessageEvent;
@@ -201,7 +202,7 @@
when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI);
when(mqClientInstance.findBrokerAddressInPublish(anyString())).thenReturn(
String.valueOf(new FindBrokerResult("test", false)));
- when(mqClientAPI.getMaxOffset(anyString(), anyString(), anyInt(), anyLong())).thenReturn(maxOffset);
+ when(mqClientAPI.getMaxOffset(anyString(), any(MessageQueue.class), anyLong())).thenReturn(maxOffset);
CompletableFuture<Long> queryOffsetFuture = lmqQueueStoreManager.queryQueueMaxOffset(queue);
verify(mqClientInstance, times(0)).updateTopicRouteInfoFromNameServer(any());
diff --git a/mqtt-exporter/pom.xml b/mqtt-exporter/pom.xml
index 708959a..12dc000 100644
--- a/mqtt-exporter/pom.xml
+++ b/mqtt-exporter/pom.xml
@@ -27,6 +27,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
index 60a0a1e..201bb06 100644
--- a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -111,6 +111,10 @@
}
private static void collect(MqttMetricsInfo mqttMetricsInfo, long val, String... labels) throws PrometheusException {
+ if (!initialized) {
+ return;
+ }
+
Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(mqttMetricsInfo.getType());
if (mqttMetricsInfoCollectorTypeMap == null) {
throw new PrometheusException("mqttMetricsInfo unregistered or collector type not support: " + mqttMetricsInfo);
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
index 376ab63..3b7c793 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
@@ -23,47 +23,46 @@
import java.util.Properties;
import org.apache.rocketmq.common.MixAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
@Component
public class MetaConf {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MetaConf.class);
private static final String CONF_FILE_NAME = "meta.conf";
private File confFile;
- private String clusterName = "defaultCluster";
private String allNodeAddress;
- private String dbPath = System.getProperty("user.home") + "/mqtt_meta/db";
- private String raftDataPath = System.getProperty("user.home") + "/mqtt_meta/raft";
private int metaPort = 25000;
private String selfAddress;
+ private int raftNodePort = 8081;
private String membersAddress;
- private int maxRetainedTopicNum = 10000;
+ private int maxRetainedTopicNum = 10000;
private int electionTimeoutMs = 1000;
private int snapshotIntervalSecs = 60 * 1000;
private String raftServiceName = System.getenv("RaftServiceName");
+ private int scanNum = 10000;
+
public MetaConf() throws IOException {
- ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
- InputStream in = classPathResource.getInputStream();
- Properties properties = new Properties();
- properties.load(in);
- in.close();
- MixAll.properties2Object(properties, this);
- this.confFile = new File(classPathResource.getURL().getFile());
+ try {
+ ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
+ InputStream in = classPathResource.getInputStream();
+ Properties properties = new Properties();
+ properties.load(in);
+ in.close();
+ MixAll.properties2Object(properties, this);
+ this.confFile = new File(classPathResource.getURL().getFile());
+ } catch (Exception e) {
+ LOGGER.error("", e);
+ }
}
public File getConfFile() {
return confFile;
}
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
public String getAllNodeAddress() {
return allNodeAddress;
}
@@ -72,22 +71,6 @@
this.allNodeAddress = allNodeAddress;
}
- public String getDbPath() {
- return dbPath;
- }
-
- public void setDbPath(String dbPath) {
- this.dbPath = dbPath;
- }
-
- public String getRaftDataPath() {
- return raftDataPath;
- }
-
- public void setRaftDataPath(String raftDataPath) {
- this.raftDataPath = raftDataPath;
- }
-
public int getMetaPort() {
return metaPort;
}
@@ -143,4 +126,20 @@
public void setRaftServiceName(String raftServiceName) {
this.raftServiceName = raftServiceName;
}
+
+ public int getRaftNodePort() {
+ return raftNodePort;
+ }
+
+ public void setRaftNodePort(int raftNodePort) {
+ this.raftNodePort = raftNodePort;
+ }
+
+ public int getScanNum() {
+ return scanNum;
+ }
+
+ public void setScanNum(int scanNum) {
+ this.scanNum = scanNum;
+ }
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java
index 0282824..19cbd32 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConfListener.java
@@ -48,6 +48,9 @@
@PostConstruct
public void start() {
confFile = metaConf.getConfFile();
+ if (confFile == null) {
+ return;
+ }
gmt.set(confFile.lastModified());
scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("ConnectConfListener"));
scheduler.scheduleWithFixedDelay(() -> {
diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttApplyListener.java
similarity index 70%
rename from mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
rename to mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttApplyListener.java
index e9635bf..16e522b 100644
--- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/meta/Constants.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttApplyListener.java
@@ -15,14 +15,18 @@
* limitations under the License.
*/
-package org.apache.rocketmq.mqtt.common.meta;
+package org.apache.rocketmq.mqtt.meta.raft;
-public class Constants {
+import com.google.protobuf.Message;
+import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
- public static final String CATEGORY_RETAINED_MSG = "retainedMsg";
- public static final String CATEGORY_WILL_MSG = "willMsg";
+public interface MqttApplyListener {
- public static final String NOT_FOUND = "NOT_FOUND";
-
- public static final String READ_INDEX_TYPE = "readIndexType";
+ /**
+ * never to block
+ *
+ * @param message
+ * @param rocksDBEngine
+ */
+ void onApply(Message message, RocksDBEngine rocksDBEngine);
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
index 5c1b428..5c376c3 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
@@ -46,7 +46,9 @@
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.HostInfo;
import org.apache.rocketmq.mqtt.meta.config.MetaConf;
+import org.apache.rocketmq.mqtt.meta.raft.processor.HashKvStateProcessor;
import org.apache.rocketmq.mqtt.meta.raft.processor.RetainedMsgStateProcessor;
import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
import org.apache.rocketmq.mqtt.meta.raft.processor.WillMsgStateProcessor;
@@ -93,6 +95,21 @@
private Map<String, MqttStateMachine> bizStateMachineMap = new ConcurrentHashMap<>();
public String[] raftGroups;
private RouteTable rt;
+ private HostInfo hostInfo = HostInfo.getInstall();
+
+ private MqttApplyListener mqttApplyListener;
+
+ public MqttApplyListener getMqttApplyListener() {
+ return mqttApplyListener;
+ }
+
+ public void setMqttApplyListener(MqttApplyListener mqttApplyListener) {
+ this.mqttApplyListener = mqttApplyListener;
+ }
+
+ public MetaConf getMetaConf() {
+ return metaConf;
+ }
@PostConstruct
void init() throws IOException, RocksDBException {
@@ -111,11 +128,20 @@
new LinkedBlockingQueue<>(10000),
new ThreadFactoryImpl("requestExecutor_"));
- registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedTopicNum())); //add retained msg processor
+ registerStateProcessor(new RetainedMsgStateProcessor(this)); //add retained msg processor
registerStateProcessor(new WillMsgStateProcessor(this));
+ registerStateProcessor(new HashKvStateProcessor(this));
rt = RouteTable.getInstance();
- localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
+ if (StringUtils.isNotBlank(metaConf.getSelfAddress())) {
+ localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
+ } else {
+ String localDomain = hostInfo.getAddress();
+ if (StringUtils.isNotBlank(metaConf.getRaftServiceName())) {
+ localDomain = hostInfo.getName() + "." + metaConf.getRaftServiceName();
+ }
+ this.localPeerId = new PeerId(localDomain, metaConf.getRaftNodePort());
+ }
rpcServer = createRpcServer(this, localPeerId);
NodeManager.getInstance().addAddress(localPeerId.getEndpoint());
if (!rpcServer.init(null)) {
@@ -129,7 +155,7 @@
FileUtils.forceMkdir(new File(rdbPath));
RocksDBEngine rocksDBEngine = new RocksDBEngine(rdbPath);
rocksDBEngine.init();
- MqttStateMachine sm = new MqttStateMachine(this);
+ MqttStateMachine sm = new MqttStateMachine(this, group);
sm.setRocksDBEngine(rocksDBEngine);
createRaftNode(group, sm);
}
@@ -162,7 +188,7 @@
nodeOptions.setSnapshotIntervalSecs(metaConf.getSnapshotIntervalSecs());
final Configuration initConf = new Configuration();
- String initConfStr = metaConf.getMembersAddress();
+ String initConfStr = initConfStr();
if (!initConf.parse(initConfStr)) {
throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
}
@@ -180,6 +206,21 @@
return node;
}
+ public String initConfStr() {
+ if (StringUtils.isNotBlank(metaConf.getMembersAddress())) {
+ return metaConf.getMembersAddress();
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 3; i++) {
+ sb.append(metaConf.getRaftServiceName() + "-" + i);
+ sb.append("." + metaConf.getRaftServiceName());
+ sb.append(":" + metaConf.getRaftNodePort());
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ return sb.toString();
+ }
+
private void registerBizStateMachine(String groupId, MqttStateMachine sm) {
MqttStateMachine prv = bizStateMachineMap.putIfAbsent(groupId, sm);
if (prv != null) {
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
index b763a29..8422065 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
@@ -29,6 +29,7 @@
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
import org.slf4j.Logger;
@@ -43,64 +44,58 @@
protected Node node;
protected RocksDBEngine rocksDBEngine;
protected final MqttRaftServer server;
+ private String group;
- public MqttStateMachine(MqttRaftServer server) {
+ public MqttStateMachine(MqttRaftServer server, String group) {
this.server = server;
+ this.group = group;
}
@Override
public void onApply(Iterator iterator) {
- int index = 0;
- int applied = 0;
Message message;
MqttClosure closure = null;
- try {
- while (iterator.hasNext()) {
- Status status = Status.OK();
- try {
- if (iterator.done() != null) {
- closure = (MqttClosure) iterator.done();
- message = closure.getMessage();
- } else {
- final ByteBuffer data = iterator.getData();
- message = parseMessage(data.array());
+ while (iterator.hasNext()) {
+ long start = System.currentTimeMillis();
+ Status status = Status.OK();
+ try {
+ if (iterator.done() != null) {
+ closure = (MqttClosure) iterator.done();
+ message = closure.getMessage();
+ } else {
+ final ByteBuffer data = iterator.getData();
+ message = parseMessage(data.array());
+ }
+ if (message instanceof WriteRequest) {
+ WriteRequest writeRequest = (WriteRequest) message;
+ StateProcessor processor = server.getProcessor(writeRequest.getCategory());
+ Response response = processor.onWriteRequest((WriteRequest) message);
+ if (Objects.nonNull(closure)) {
+ closure.setResponse(response);
}
-
- LOGGER.debug("get message:{} and apply to state machine", message);
-
- if (message instanceof WriteRequest) {
- WriteRequest writeRequest = (WriteRequest) message;
- StateProcessor processor = server.getProcessor(writeRequest.getCategory());
- Response response = processor.onWriteRequest((WriteRequest) message);
- if (Objects.nonNull(closure)) {
- closure.setResponse(response);
- }
- }
-
- if (message instanceof ReadRequest) {
- ReadRequest request = (ReadRequest) message;
- StateProcessor processor = server.getProcessor(request.getCategory());
- Response response = processor.onReadRequest((ReadRequest) message);
- if (Objects.nonNull(closure)) {
- closure.setResponse(response);
- }
- }
- } catch (Throwable e) {
- index++;
- status.setError(RaftError.UNKNOWN, e.toString());
- Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
- throw e;
- } finally {
- Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
}
- applied++;
- index++;
- iterator.next();
+ if (message instanceof ReadRequest) {
+ ReadRequest request = (ReadRequest) message;
+ StateProcessor processor = server.getProcessor(request.getCategory());
+ Response response = processor.onReadRequest((ReadRequest) message);
+ if (Objects.nonNull(closure)) {
+ closure.setResponse(response);
+ }
+ }
+ MqttApplyListener applyListener = server.getMqttApplyListener();
+ if (applyListener != null) {
+ applyListener.onApply(message, rocksDBEngine);
+ }
+ } catch (Throwable e) {
+ LOGGER.error("stateMachine meet critical error", e);
+ status.setError(RaftError.UNKNOWN, e.toString());
+ Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
+ } finally {
+ Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
+ StatUtil.addInvoke(StatUtil.buildKey(group, "apply"), System.currentTimeMillis() - start);
}
- } catch (Throwable t) {
- LOGGER.error("stateMachine meet critical error", t);
- //iterator.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
+ iterator.next();
}
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/HashKvStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/HashKvStateProcessor.java
new file mode 100644
index 0000000..935b646
--- /dev/null
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/HashKvStateProcessor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.meta.raft.processor;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.google.protobuf.ByteString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
+import org.apache.rocketmq.mqtt.common.model.consistency.Response;
+import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
+import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
+import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_HASH_KV;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_HASH_KV_FIELD;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_DEL;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_DEL_HASH;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_GET;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_GET_HASH;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_PUT;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.OP_KV_PUT_HASH;
+
+public class HashKvStateProcessor extends StateProcessor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HashKvStateProcessor.class);
+ private MqttRaftServer server;
+
+ public HashKvStateProcessor(MqttRaftServer server) {
+ this.server = server;
+ }
+
+ @Override
+ public Response onReadRequest(ReadRequest request) throws Exception {
+ long start = System.currentTimeMillis();
+ try {
+ MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
+ if (sm == null) {
+ LOGGER.error("Fail to process will ReadRequest , Not Found SM for {}", request.getGroup());
+ return null;
+ }
+ String operation = request.getOperation();
+ String key = request.getKey();
+ if (OP_KV_GET.equals(operation)) {
+ byte[] value = getRdb(sm.getRocksDBEngine(), key.getBytes(StandardCharsets.UTF_8));
+ if (value == null) {
+ return Response.newBuilder()
+ .setSuccess(true)
+ .build();
+ }
+ return Response.newBuilder()
+ .setSuccess(true)
+ .setData(ByteString.copyFrom(value))
+ .build();
+ } else if (OP_KV_GET_HASH.equals(operation)) {
+ byte[] value = getRdb(sm.getRocksDBEngine(), key.getBytes(StandardCharsets.UTF_8));
+ if (value == null) {
+ return Response.newBuilder()
+ .setSuccess(true)
+ .build();
+ }
+ Map<String, String> map = JSONObject.parseObject(
+ new String(value, StandardCharsets.UTF_8),
+ new TypeReference<Map<String, String>>() {
+ });
+ String field = null;
+ if (request.getExtDataMap() != null) {
+ field = request.getExtDataMap().get(OP_HASH_KV_FIELD);
+ }
+ Map<String, String> result = new HashMap<>();
+ if (StringUtils.isNotBlank(field)) {
+ result.put(field, map.get(field));
+ } else {
+ result.putAll(map);
+ }
+ return Response.newBuilder()
+ .setSuccess(true)
+ .putAllDataMap(result)
+ .build();
+ }
+ } catch (Throwable e) {
+ LOGGER.error("Fail to process will ReadRequest, k {}", request.getKey(), e);
+ throw e;
+ } finally {
+ StatUtil.addInvoke("HashKvRead", System.currentTimeMillis() - start);
+ }
+ return null;
+ }
+
+ @Override
+ public Response onWriteRequest(WriteRequest log) throws Exception {
+ long start = System.currentTimeMillis();
+ try {
+ MqttStateMachine sm = server.getMqttStateMachine(log.getGroup());
+ if (sm == null) {
+ LOGGER.error("Fail to process will WriteRequest , Not Found SM for {}", log.getGroup());
+ return null;
+ }
+ String operation = log.getOperation();
+ String key = log.getKey();
+ byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ byte[] value = log.getData().toByteArray();
+ if (value == null) {
+ return null;
+ }
+ if (OP_KV_PUT.equals(operation)) {
+ return put(sm.getRocksDBEngine(), keyBytes, value);
+ } else if (OP_KV_DEL.equals(operation)) {
+ return delete(sm.getRocksDBEngine(), keyBytes);
+ } else if (OP_KV_PUT_HASH.equals(operation)) {
+ Map<String, String> map = new HashMap<>();
+ map.putAll(log.getExtDataMap());
+ byte[] oldValue = getRdb(sm.getRocksDBEngine(), keyBytes);
+ if (oldValue != null && oldValue.length > 1) {
+ Map<String, String> oldMap = JSONObject.parseObject(
+ new String(oldValue, StandardCharsets.UTF_8),
+ new TypeReference<Map<String, String>>() {
+ });
+ oldMap.putAll(map);
+ map.putAll(oldMap);
+ }
+ return put(sm.getRocksDBEngine(), keyBytes, JSON.toJSONBytes(map));
+ } else if (OP_KV_DEL_HASH.equals(operation)) {
+ String field = log.getExtDataMap().get(OP_HASH_KV_FIELD);
+ if (StringUtils.isBlank(field)) {
+ return Response.newBuilder()
+ .setSuccess(false)
+ .setErrMsg("No Found Field")
+ .build();
+ }
+ byte[] oldValue = getRdb(sm.getRocksDBEngine(), keyBytes);
+ if (oldValue == null || oldValue.length < 1) {
+ return Response.newBuilder()
+ .setSuccess(true)
+ .build();
+ }
+ Map<String, String> oldMap = JSONObject.parseObject(
+ new String(oldValue, StandardCharsets.UTF_8),
+ new TypeReference<Map<String, String>>() {
+ });
+ if (!oldMap.containsKey(field)) {
+ return Response.newBuilder()
+ .setSuccess(true)
+ .build();
+ }
+ oldMap.remove(field);
+ if (oldMap.isEmpty()) {
+ return delete(sm.getRocksDBEngine(), keyBytes);
+ } else {
+ return put(sm.getRocksDBEngine(), keyBytes, JSON.toJSONBytes(oldMap));
+ }
+ }
+ } catch (Throwable e) {
+ LOGGER.error("Fail to process will WriteRequest, k {}", log.getKey(), e);
+ throw e;
+ } finally {
+ StatUtil.addInvoke("HashKvWrite", System.currentTimeMillis() - start);
+ }
+ return null;
+ }
+
+ @Override
+ public String groupCategory() {
+ return CATEGORY_HASH_KV;
+ }
+}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
index f12306b..deca317 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
@@ -19,14 +19,17 @@
import com.alibaba.fastjson.JSON;
import com.google.protobuf.ByteString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
import org.apache.rocketmq.mqtt.common.model.Trie;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
+import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,33 +39,37 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_OPERATION_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_READ_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_IS_EMPTY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_TOPIC;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.RETAIN_REQ_WRITE_PARAM_EXPIRE;
+
public class RetainedMsgStateProcessor extends StateProcessor {
private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcessor.class);
private final ConcurrentHashMap<String, Trie<String, String>> retainedMsgTopicTrie = new ConcurrentHashMap<>(); //key:firstTopic value:retained topic Trie
private MqttRaftServer server;
- private int maxRetainedTopicNum;
- public RetainedMsgStateProcessor(MqttRaftServer server, int maxRetainedTopicNum) {
+ public RetainedMsgStateProcessor(MqttRaftServer server) {
this.server = server;
- this.maxRetainedTopicNum = maxRetainedTopicNum;
}
@Override
public Response onReadRequest(ReadRequest request) {
+ long start = System.currentTimeMillis();
try {
MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
if (sm == null) {
logger.error("Fail to process RetainedMsg ReadRequest , Not Found SM for {}", request.getGroup());
return null;
}
- String topic = request.getExtDataMap().get("topic");
- String firstTopic = request.getExtDataMap().get("firstTopic");
+ String topic = request.getExtDataMap().get(RETAIN_REQ_READ_PARAM_TOPIC);
+ String firstTopic = request.getExtDataMap().get(RETAIN_REQ_READ_PARAM_FIRST_TOPIC);
String operation = request.getOperation();
-
- logger.info("FirstTopic:{} Topic:{} Operation:{}", firstTopic, topic, operation);
-
- if (operation.equals("topic")) { //return retained msg
+ if (operation.equals(RETAIN_REQ_READ_PARAM_OPERATION_TOPIC)) { //return retained msg
return get(sm.getRocksDBEngine(), topic.getBytes(StandardCharsets.UTF_8));
} else { //return retain msgs of matched Topic
String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
@@ -95,24 +102,27 @@
.addAllDatalist(msgResults)//return retained msgs of matched Topic
.build();
}
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("", e);
return Response.newBuilder()
.setSuccess(false)
.setErrMsg(e.getMessage())
.build();
+ } finally {
+ StatUtil.addInvoke("RetainRead", System.currentTimeMillis() - start);
}
}
- boolean setRetainedMsg(RocksDBEngine rocksDBEngine, String firstTopic, String topic, boolean isEmpty, byte[] msg) throws Exception {
+ boolean setRetainedMsg(RocksDBEngine rocksDBEngine, String firstTopic, String topic, boolean isEmpty, byte[] msg, Long expire) throws Exception {
String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
// if the trie of firstTopic doesn't exist
if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
retainedMsgTopicTrie.put(wrapTrieFirstTopic, new Trie<String, String>());
}
if (isEmpty) {
- //delete from trie
- logger.info("Delete the topic {} retained message", topic);
+ if (expire != null && !checkExpire(rocksDBEngine, expire, topic)) {
+ return true;
+ }
delete(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8));
Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
if (trie != null) {
@@ -122,7 +132,7 @@
} else {
//Add to trie
Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
- if (trie.getNodePath().size() < maxRetainedTopicNum) {
+ if (trie.getNodePath().size() < server.getMetaConf().getMaxRetainedTopicNum()) {
put(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8), msg);
trie.addNode(topic, "", "");
put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
@@ -134,48 +144,63 @@
return true;
}
+ private boolean checkExpire(RocksDBEngine rocksDBEngine, long expire, String topic) {
+ try {
+ byte[] value = getRdb(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8));
+ if (value == null) {
+ return true;
+ }
+ return System.currentTimeMillis() - StoreMessage.parseFrom(value).getBornTimestamp() > expire;
+ } catch (Throwable t) {
+ logger.error("", t);
+ }
+ return false;
+ }
+
private String wrapTrieFirstTopic(String firstTopic) {
return "$" + firstTopic + "$";
}
@Override
public Response onWriteRequest(WriteRequest writeRequest) {
+ long start = System.currentTimeMillis();
try {
MqttStateMachine sm = server.getMqttStateMachine(writeRequest.getGroup());
if (sm == null) {
logger.error("Fail to process RetainedMsg WriteRequest , Not Found SM for {}", writeRequest.getGroup());
return null;
}
- String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("firstTopic")); //retained msg firstTopic
- String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get("topic")); //retained msg topic
- boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get("isEmpty")); //retained msg is empty
+ String firstTopic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_FIRST_TOPIC)); //retained msg firstTopic
+ String topic = TopicUtils.normalizeTopic(writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_TOPIC)); //retained msg topic
+ boolean isEmpty = Boolean.parseBoolean(writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_IS_EMPTY)); //retained msg is empty
+ String expireStr = writeRequest.getExtDataMap().get(RETAIN_REQ_WRITE_PARAM_EXPIRE);
+ Long expire = StringUtils.isNotBlank(expireStr) ? Long.parseLong(expireStr) : null;
byte[] message = writeRequest.getData().toByteArray();
- boolean res = setRetainedMsg(sm.getRocksDBEngine(), firstTopic, topic, isEmpty, message);
+ boolean res = setRetainedMsg(sm.getRocksDBEngine(), firstTopic, topic, isEmpty, message, expire);
if (!res) {
- logger.warn("Put the topic {} retained message failed! Exceeded maximum number of reserved topics limit.", topic);
return Response.newBuilder()
.setSuccess(false)
- .setErrMsg("Exceeded maximum number of reserved topics limit.")
+ .setErrMsg("f")
.build();
}
- logger.info("Put the topic {} retained message success!", topic);
-
return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(topic)))
.build();
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("Put the retained message error!", e);
return Response.newBuilder()
.setSuccess(false)
.setErrMsg(e.getMessage())
.build();
+ } finally {
+ StatUtil.addInvoke("RetainWrite", System.currentTimeMillis() - start);
}
}
@Override
public String groupCategory() {
- return Constants.CATEGORY_RETAINED_MSG;
+ return MetaConstants.CATEGORY_RETAINED_MSG;
}
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
index 3fe771c..e2fef2b 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
@@ -22,7 +22,7 @@
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
@@ -70,7 +70,7 @@
try {
byte[] value = rocksDBEngine.getRdb().get(key);
if (value == null) {
- value = Constants.NOT_FOUND.getBytes();
+ value = MetaConstants.NOT_FOUND.getBytes();
}
return Response.newBuilder()
.setSuccess(true)
@@ -153,7 +153,7 @@
}
}
- public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey) throws Exception {
+ public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey, long scanNum) throws Exception {
Map<String, String> result = new HashMap<>();
final Lock readLock = rocksDBEngine.getReadWriteLock().readLock();
readLock.lock();
@@ -170,6 +170,9 @@
break;
}
result.put(new String(key), new String(it.value()));
+ if (result.size() >= scanNum) {
+ break;
+ }
it.next();
}
return Response.newBuilder()
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
index 5376a99..2e44904 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
@@ -17,16 +17,27 @@
package org.apache.rocketmq.mqtt.meta.raft.processor;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_WILL_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.CATEGORY_WILL_MSG;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_END_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_GET;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_SCAN_NUM;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_READ_START_KEY;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_COMPARE_AND_PUT;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_DELETE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_EXPECT_VALUE;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.WILL_REQ_WRITE_PUT;
public class WillMsgStateProcessor extends StateProcessor {
private static Logger logger = LoggerFactory.getLogger(WillMsgStateProcessor.class);
@@ -38,6 +49,7 @@
@Override
public Response onReadRequest(ReadRequest request) throws Exception {
+ long start = System.currentTimeMillis();
try {
MqttStateMachine sm = server.getMqttStateMachine(request.getGroup());
if (sm == null) {
@@ -46,27 +58,35 @@
}
String operation = request.getOperation();
String key = request.getKey();
- if ("get".equals(operation)) {
+ if (WILL_REQ_READ_GET.equals(operation)) {
return get(sm.getRocksDBEngine(), key.getBytes());
- } else if ("scan".equals(operation)) {
- String startKey = request.getExtDataMap().get("startKey");
- String endKey = request.getExtDataMap().get("endKey");
- return scan(sm.getRocksDBEngine(), startKey.getBytes(), endKey.getBytes());
+ } else if (WILL_REQ_READ_SCAN.equals(operation)) {
+ String startKey = request.getExtDataMap().get(WILL_REQ_READ_START_KEY);
+ String endKey = request.getExtDataMap().get(WILL_REQ_READ_END_KEY);
+ String scanNumStr = request.getExtDataMap().get(WILL_REQ_READ_SCAN_NUM);
+ long scanNum = server.getMetaConf().getScanNum();
+ if (StringUtils.isNotBlank(scanNumStr)) {
+ scanNum = Long.parseLong(scanNumStr);
+ }
+ return scan(sm.getRocksDBEngine(), startKey.getBytes(), endKey.getBytes(), scanNum);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
if (request.getKey() == null) {
logger.error("Fail to delete, startKey {}, endKey {}", request.getExtDataMap().get("startKey"), request.getExtDataMap().get("endKey"), e);
} else {
logger.error("Fail to process will ReadRequest, k {}", request.getKey(), e);
}
-
throw e;
+ } finally {
+ StatUtil.addInvoke("WillRead", System.currentTimeMillis() - start);
}
+
return null;
}
@Override
public Response onWriteRequest(WriteRequest log) throws Exception {
+ long start = System.currentTimeMillis();
try {
MqttStateMachine sm = server.getMqttStateMachine(log.getGroup());
if (sm == null) {
@@ -77,20 +97,22 @@
String key = log.getKey();
byte[] value = log.getData().toByteArray();
- if ("put".equals(operation)) {
+ if (WILL_REQ_WRITE_PUT.equals(operation)) {
return put(sm.getRocksDBEngine(), key.getBytes(), value);
- } else if ("delete".equals(operation)) {
+ } else if (WILL_REQ_WRITE_DELETE.equals(operation)) {
return delete(sm.getRocksDBEngine(), key.getBytes());
- } else if ("compareAndPut".equals(operation)) {
- String expectValue = log.getExtDataMap().get("expectValue");
- if (Constants.NOT_FOUND.equals(expectValue)) {
+ } else if (WILL_REQ_WRITE_COMPARE_AND_PUT.equals(operation)) {
+ String expectValue = log.getExtDataMap().get(WILL_REQ_WRITE_EXPECT_VALUE);
+ if (MetaConstants.NOT_FOUND.equals(expectValue)) {
return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), null, value);
}
return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), log.getExtDataMap().get("expectValue").getBytes(), value);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("Fail to process will WriteRequest, k {}", log.getKey(), e);
throw e;
+ } finally {
+ StatUtil.addInvoke("WillWrite", System.currentTimeMillis() - start);
}
return null;
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
index 08463f2..3fb3e1c 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java
@@ -128,7 +128,7 @@
Response response = processor.onReadRequest(request);
rpcCtx.sendResponse(response);
} catch (Throwable t) {
- LOGGER.info("process read request in handleReadIndex error : {}", t.toString());
+ LOGGER.error("process read request in handleReadIndex error : {}", t.toString());
rpcCtx.sendResponse(Response.newBuilder().setErrMsg(t.toString()).setSuccess(false).build());
}
return;
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
index f8d6755..fd436de 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttReadRpcProcessor.java
@@ -19,14 +19,22 @@
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
+import org.apache.rocketmq.mqtt.common.model.consistency.Response;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
+import org.apache.rocketmq.mqtt.meta.raft.processor.StateProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
/**
* The RPC Processor for read request.
*/
public class MqttReadRpcProcessor extends AbstractRpcProcessor implements RpcProcessor<ReadRequest> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MqttReadRpcProcessor.class);
private final MqttRaftServer server;
public MqttReadRpcProcessor(MqttRaftServer server) {
@@ -35,13 +43,32 @@
@Override
public void handleRequest(RpcContext rpcCtx, ReadRequest request) {
- if (Constants.READ_INDEX_TYPE.equals(request.getType())) {
+ StatUtil.addPv(StatUtil.buildKey("ReadRpc", request.getGroup()), 1);
+ if (MetaConstants.READ_INDEX_TYPE.equals(request.getType())) {
handleReadIndex(server, request.getGroup(), rpcCtx, request);
+ } else if (MetaConstants.ANY_READ_TYPE.equals(request.getType())) {
+ anyRead(rpcCtx, request);
} else {
handleRequest(server, request.getGroup(), rpcCtx, request);
}
}
+ private void anyRead(RpcContext rpcCtx, ReadRequest request) {
+ final StateProcessor processor = server.getProcessor(request.getCategory());
+ if (Objects.isNull(processor)) {
+ rpcCtx.sendResponse(Response.newBuilder().setSuccess(false)
+ .setErrMsg("Could not find the StateProcessor: " + request.getCategory()).build());
+ return;
+ }
+ try {
+ Response response = processor.onReadRequest(request);
+ rpcCtx.sendResponse(response);
+ } catch (Throwable t) {
+ LOGGER.error("process read request in anyRead error : {}", t.toString());
+ rpcCtx.sendResponse(Response.newBuilder().setErrMsg(t.toString()).setSuccess(false).build());
+ }
+ }
+
@Override
public String interest() {
return ReadRequest.class.getName();
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
index 80f0569..92f36e5 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/MqttWriteRpcProcessor.java
@@ -20,6 +20,7 @@
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
/**
@@ -34,6 +35,7 @@
@Override
public void handleRequest(RpcContext rpcCtx, WriteRequest request) {
+ StatUtil.addPv(StatUtil.buildKey("WriteRpc", request.getGroup()), 1);
handleRequest(server, request.getGroup(), rpcCtx, request);
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/MetaStartup.java
similarity index 85%
rename from mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
rename to mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/MetaStartup.java
index 8b5e8b3..1adab2e 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/Startup.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/starter/MetaStartup.java
@@ -16,16 +16,18 @@
*/
package org.apache.rocketmq.mqtt.meta.starter;
-import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.mqtt.meta.util.SpringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
-public class Startup {
+public class MetaStartup {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MetaStartup.class);
+
public static void main(String[] args) {
- System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:meta_spring.xml");
SpringUtil.setApplicationContext(applicationContext);
- System.out.println("start meta ...");
+ LOGGER.info("start meta ...");
}
}
\ No newline at end of file
diff --git a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
index e593527..577d29a 100644
--- a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
+++ b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClientTest.java
@@ -37,7 +37,7 @@
import org.apache.rocketmq.mqtt.common.model.consistency.StoreMessage;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
+import org.apache.rocketmq.mqtt.common.meta.MetaConstants;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +61,7 @@
String originTopic = "test-f1/f2/";
String topicFilter = "test-f1/+/";
- final String groupId = Constants.CATEGORY_RETAINED_MSG + "-" + 0;
+ final String groupId = MetaConstants.CATEGORY_RETAINED_MSG + "-" + 0;
final String confStr = "127.0.0.1:25001";
CliClientServiceImpl cliClientService = new CliClientServiceImpl();
Configuration conf = new Configuration();
@@ -182,7 +182,7 @@
option.put("flag", "topic");
option.put("topic", firstTopic + "/t1/");
- final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedmsg-0").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();
+ final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedmsg-0").setType(MetaConstants.READ_INDEX_TYPE).putAllExtData(option).build();
CompletableFuture<Message> future = new CompletableFuture<>();
@@ -242,7 +242,7 @@
.addAllDatalist(msgResults)
.build());
- final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedMsg-0").setOperation("trie").setType(Constants.READ_INDEX_TYPE).putAllExtData(option).build();
+ final ReadRequest request = ReadRequest.newBuilder().setGroup("retainedMsg-0").setOperation("trie").setType(MetaConstants.READ_INDEX_TYPE).putAllExtData(option).build();
try {
cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {
diff --git a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java
index bcbb013..08690d7 100644
--- a/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java
+++ b/mqtt-meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/WillMsgStateProcessorTest.java
@@ -33,7 +33,7 @@
import java.io.File;
import java.io.IOException;
-import static org.apache.rocketmq.mqtt.common.meta.Constants.NOT_FOUND;
+import static org.apache.rocketmq.mqtt.common.meta.MetaConstants.NOT_FOUND;
@RunWith(MockitoJUnitRunner.class)
public class WillMsgStateProcessorTest {
@@ -128,7 +128,7 @@
Response response1 = willMsgStateProcessor.put(rocksDBEngine, key1.getBytes(), value1.getBytes());
Assert.assertTrue(response1.getSuccess());
- Response scanResponse = willMsgStateProcessor.scan(rocksDBEngine, ("k1" + CTRL_0).getBytes(), ("k1" + CTRL_2).getBytes());
+ Response scanResponse = willMsgStateProcessor.scan(rocksDBEngine, ("k1" + CTRL_0).getBytes(), ("k1" + CTRL_2).getBytes(), 100);
Assert.assertEquals(value, scanResponse.getDataMapMap().get(key));
Assert.assertEquals(value1, scanResponse.getDataMapMap().get(key1));
}
diff --git a/pom.xml b/pom.xml
index e928bc0..7e9ac4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
@@ -37,7 +38,7 @@
<java.target.version>1.8</java.target.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>4.3.16.RELEASE</spring.version>
- <rocket.version>4.9.3</rocket.version>
+ <rocket.version>5.1.3</rocket.version>
<prometheus.version>0.12.0</prometheus.version>
<grpc-java.version>1.24.0</grpc-java.version>
<proto-google-common-protos.version>1.17.0</proto-google-common-protos.version>
@@ -129,6 +130,11 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>