will, retain
diff --git a/README.md b/README.md
index 042d93e..eae4ec8 100644
--- a/README.md
+++ b/README.md
@@ -85,7 +85,7 @@
The mqtt-example module has written basic usage example code, which can be used for reference
## Protocol Version
-The currently supported protocol version is [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf), but the will and retain features are not supported yet
+The currently supported protocol version is [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf).
## Authentication
At present, an implementation based on the HmacSHA1 signature algorithm is provided by default, Refer to **AuthManagerSample**. Users can customize other implementations to meet the needs of businesses to flexibly verify resources and identities.
diff --git a/distribution/conf/meta.conf b/distribution/conf/meta.conf
index aa4cba7..d4ee418 100644
--- a/distribution/conf/meta.conf
+++ b/distribution/conf/meta.conf
@@ -16,4 +16,3 @@
selfAddress=
membersAddress=
-maxRetainedMessageNum=10000
\ No newline at end of file
diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java
index 3edf003..0f869c0 100644
--- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java
+++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainConsumer.java
@@ -45,7 +45,7 @@
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(recvClientId + " connect success to " + serverURI);
try {
- final String topicFilter[] = {firstTopic + "/r3", firstTopic + "/retainTopic1", firstTopic + "/willTopic1",};
+ final String topicFilter[] = {firstTopic + "/r3", firstTopic + "/retainTopic/+", firstTopic + "/willTopic1",};
final int[] qos = {1, 1, 1};
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java
index 18d4aae..064c1f0 100644
--- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java
+++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/MqttWillRetainProducer.java
@@ -68,7 +68,7 @@
e.printStackTrace();
}
long interval = 1000;
- int c = 10;
+ int c = 3;
for (int i = 0; i < c; i++) {
String msg = "r3_" + System.currentTimeMillis() + "_" + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
@@ -76,7 +76,7 @@
String mqttSendTopic = firstTopic + "/r3";
if (i >= c - 1) {
message.setRetained(true);
- mqttSendTopic = firstTopic + "/retainTopic1";
+ mqttSendTopic = firstTopic + "/retainTopic/1";
}
mqttClient.publish(mqttSendTopic, message);
System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
index 8133773..376ab63 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/config/MetaConf.java
@@ -37,9 +37,9 @@
private int metaPort = 25000;
private String selfAddress;
private String membersAddress;
- private int maxRetainedMessageNum;
+ private int maxRetainedTopicNum = 10000;
private int electionTimeoutMs = 1000;
- private int snapshotIntervalSecs = 1000;
+ private int snapshotIntervalSecs = 60 * 1000;
private String raftServiceName = System.getenv("RaftServiceName");
public MetaConf() throws IOException {
@@ -128,12 +128,12 @@
this.snapshotIntervalSecs = snapshotIntervalSecs;
}
- public int getMaxRetainedMessageNum() {
- return maxRetainedMessageNum;
+ public int getMaxRetainedTopicNum() {
+ return maxRetainedTopicNum;
}
- public void setMaxRetainedMessageNum(int maxRetainedMessageNum) {
- this.maxRetainedMessageNum = maxRetainedMessageNum;
+ public void setMaxRetainedTopicNum(int maxRetainedTopicNum) {
+ this.maxRetainedTopicNum = maxRetainedTopicNum;
}
public String getRaftServiceName() {
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
index fb377e2..5c1b428 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java
@@ -111,6 +111,9 @@
new LinkedBlockingQueue<>(10000),
new ThreadFactoryImpl("requestExecutor_"));
+ registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedTopicNum())); //add retained msg processor
+ registerStateProcessor(new WillMsgStateProcessor(this));
+
rt = RouteTable.getInstance();
localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
rpcServer = createRpcServer(this, localPeerId);
@@ -134,9 +137,6 @@
CliOptions cliOptions = new CliOptions();
this.cliService = RaftServiceFactory.createAndInitCliService(cliOptions);
this.cliClientService = (CliClientServiceImpl) ((CliServiceImpl) this.cliService).getCliClientService();
-
- registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedMessageNum())); //add retained msg processor
- registerStateProcessor(new WillMsgStateProcessor(this));
}
private void refreshLeader() {
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
index 20ac4e5..b763a29 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
@@ -100,8 +100,7 @@
}
} catch (Throwable t) {
LOGGER.error("stateMachine meet critical error", t);
- iterator.setErrorAndRollback(index - applied,
- new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
+ //iterator.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
}
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
index 134f62f..e2c7c23 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/RetainedMsgStateProcessor.java
@@ -41,11 +41,11 @@
private static Logger logger = LoggerFactory.getLogger(RetainedMsgStateProcessor.class);
private final ConcurrentHashMap<String, Trie<String, String>> retainedMsgTopicTrie = new ConcurrentHashMap<>(); //key:firstTopic value:retained topic Trie
private MqttRaftServer server;
- private int maxRetainedMessageNum;
+ private int maxRetainedTopicNum;
- public RetainedMsgStateProcessor(MqttRaftServer server, int maxRetainedMessageNum) {
+ public RetainedMsgStateProcessor(MqttRaftServer server, int maxRetainedTopicNum) {
this.server = server;
- this.maxRetainedMessageNum = maxRetainedMessageNum;
+ this.maxRetainedTopicNum = maxRetainedTopicNum;
}
@Override
@@ -65,20 +65,21 @@
if (operation.equals("topic")) { //return retained msg
return get(sm.getRocksDBEngine(), topic.getBytes(StandardCharsets.UTF_8));
} else { //return retain msgs of matched Topic
- if (!retainedMsgTopicTrie.containsKey(firstTopic)) {
+ String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
+ if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
Trie<String, String> newTrie = new Trie<>();
- Response value = get(sm.getRocksDBEngine(), firstTopic.getBytes(StandardCharsets.UTF_8));
+ Response value = get(sm.getRocksDBEngine(), wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8));
if (value != null && value.getData() != null) {
newTrie = JSON.parseObject(value.getData().toStringUtf8(), Trie.class);
}
- retainedMsgTopicTrie.put(firstTopic, newTrie);
+ retainedMsgTopicTrie.put(wrapTrieFirstTopic, newTrie);
return Response.newBuilder()
.setSuccess(true)
.setData(ByteString.copyFrom(JSON.toJSONBytes(new ArrayList<byte[]>())))
.build();
}
- Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(firstTopic);
+ Trie<String, String> tmpTrie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
Set<String> matchTopics = tmpTrie.getAllPath(topic);
ArrayList<ByteString> msgResults = new ArrayList<>();
@@ -104,29 +105,27 @@
}
boolean setRetainedMsg(RocksDBEngine rocksDBEngine, String firstTopic, String topic, boolean isEmpty, byte[] msg) throws Exception {
-
+ String wrapTrieFirstTopic = wrapTrieFirstTopic(firstTopic);
// if the trie of firstTopic doesn't exist
- if (!retainedMsgTopicTrie.containsKey(firstTopic)) {
- retainedMsgTopicTrie.put(TopicUtils.normalizeTopic(firstTopic), new Trie<String, String>());
+ if (!retainedMsgTopicTrie.containsKey(wrapTrieFirstTopic)) {
+ retainedMsgTopicTrie.put(wrapTrieFirstTopic, new Trie<String, String>());
}
-
if (isEmpty) {
//delete from trie
logger.info("Delete the topic {} retained message", topic);
delete(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8));
- Trie<String, String> trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic));
+ Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
if (trie != null) {
trie.deleteTrieNode(topic, "");
}
- put(rocksDBEngine, firstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
+ put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
} else {
//Add to trie
- Trie<String, String> trie = retainedMsgTopicTrie.get(TopicUtils.normalizeTopic(firstTopic));
- logger.info("maxRetainedMessageNum:{}", maxRetainedMessageNum);
- if (trie.getNodePath().size() < maxRetainedMessageNum) {
+ Trie<String, String> trie = retainedMsgTopicTrie.get(wrapTrieFirstTopic);
+ if (trie.getNodePath().size() < maxRetainedTopicNum) {
put(rocksDBEngine, topic.getBytes(StandardCharsets.UTF_8), msg);
trie.addNode(topic, "", "");
- put(rocksDBEngine, firstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
+ put(rocksDBEngine, wrapTrieFirstTopic.getBytes(StandardCharsets.UTF_8), JSON.toJSONBytes(trie));
return true;
} else {
return false;
@@ -135,6 +134,10 @@
return true;
}
+ private String wrapTrieFirstTopic(String firstTopic){
+ return "$"+firstTopic+"$";
+ }
+
@Override
public Response onWriteRequest(WriteRequest writeRequest) {
try {