Refactor:will, retain
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
index fc07cbd..cd59680 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java
@@ -20,23 +20,19 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
import org.apache.rocketmq.mqtt.common.facade.SubscriptionPersistManager;
import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
-import org.apache.rocketmq.mqtt.common.hook.HookResult;
import org.apache.rocketmq.mqtt.common.meta.IpUtil;
import org.apache.rocketmq.mqtt.common.model.Constants;
-import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
import org.apache.rocketmq.mqtt.common.model.PullResult;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.model.WillMessage;
-import org.apache.rocketmq.mqtt.common.util.MessageUtil;
import org.apache.rocketmq.mqtt.common.util.SpringUtils;
import org.apache.rocketmq.mqtt.cs.channel.ChannelInfo;
import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
@@ -54,7 +50,6 @@
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -115,8 +110,6 @@
private ScheduledThreadPoolExecutor persistOffsetScheduler;
private SubscriptionPersistManager subscriptionPersistManager;
- private ScheduledThreadPoolExecutor aliveService;
- private ThreadPoolExecutor executor;
/**
* channelId->session
@@ -129,26 +122,13 @@
private AtomicLong rid = new AtomicLong();
private long pullIntervalMillis = 10;
- private long checkAliveIntervalMillis = 5 * 1000;
-
@PostConstruct
public void init() {
pullService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("pull_message_thread_"));
- aliveService = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("check_alive_thread_"));
scheduler = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("loop_scheduler_"));
persistOffsetScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("persistOffset_scheduler_"));
persistOffsetScheduler.scheduleWithFixedDelay(() -> persistAllOffset(true), 5000, 5000, TimeUnit.MILLISECONDS);
pullService.scheduleWithFixedDelay(() -> pullLoop(), pullIntervalMillis, pullIntervalMillis, TimeUnit.MILLISECONDS);
- aliveService.scheduleWithFixedDelay(() -> csLoop(), 15 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
- aliveService.scheduleWithFixedDelay(() -> masterLoop(), 10 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
-
- executor = new ThreadPoolExecutor(
- 1,
- 1,
- 1,
- TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(5000),
- new ThreadFactoryImpl("DispatchWillToMQ_ "));
}
private void pullLoop() {
@@ -174,168 +154,6 @@
}
}
- private void csLoop() {
- try {
- String ip = IpUtil.getLocalAddressCompatible();
- String csKey = Constants.CS_ALIVE + Constants.CTRL_1 + ip;
- String masterKey = Constants.CS_MASTER;
- long currentTime = System.currentTimeMillis();
-
- willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
- if (result == null || throwable != null) {
- logger.error("{} fail to put csKey", csKey, throwable);
- }
- });
-
- willMsgPersistManager.get(masterKey).whenComplete((result, throwable) -> {
- String content = new String(result);
- if (Constants.NOT_FOUND.equals(content) || masterHasDown(content)) {
- willMsgPersistManager.compareAndPut(masterKey, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
- if (!rs || tb != null) {
- logger.error("{} fail to update master", ip, tb);
- return;
- }
- logger.info("{} update master successfully", ip);
- });
- }
- });
- } catch (Exception e) {
- logger.error("", e);
- }
- }
-
- private boolean masterHasDown(String masterValue) {
- String[] ipTime = masterValue.split(Constants.COLON);
- if (ipTime.length < 2) {
- logger.error("master ip:updateTime split error, len < 2");
- return true;
- }
-
- return System.currentTimeMillis() - Long.parseLong(ipTime[1]) > 10 * checkAliveIntervalMillis;
- }
-
- private void masterLoop() {
- try {
- String ip = IpUtil.getLocalAddressCompatible();
- if (ip == null) {
- logger.error("can not get local ip");
- return;
- }
-
- willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
- if (result == null || throwable != null) {
- logger.error("fail to get CS_MASTER", throwable);
- return;
- }
-
- String content = new String(result);
- if (Constants.NOT_FOUND.equals(content)) {
- // no master
- return;
- }
-
- if (!content.startsWith(ip)) {
- // is not master
- return;
- }
- // master keep alive
- long currentTime = System.currentTimeMillis();
- willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
- if (!rs || tb != null) {
- logger.error("{} fail to update master", ip, tb);
- }
- });
-
- // master to check all cs state
- String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
- String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
- willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
- if (rs == null || tb != null) {
- logger.error("{} master fail to scan cs", ip, tb);
- return;
- }
-
- if (rs.size() == 0) {
- logger.info("master scanned 0 cs");
- return;
- }
-
- for (Map.Entry<String, String> entry : rs.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
- if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
- // the cs has down
- String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
- handleShutDownCS(csIp);
-
- willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
- if (!resultDel || tbDel != null) {
- logger.error("fail to delete shutDown cs:{} in db", key);
- return;
- }
- logger.debug("delete shutDown cs:{} in db successfully", key);
- });
- }
- }
- });
- });
- } catch (Exception e) {
- logger.error("", e);
- }
- }
-
- private void handleShutDownCS(String ip) {
- String startClientKey = ip + Constants.CTRL_0;
- String endClientKey = ip + Constants.CTRL_2;
- willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
- if (willMap == null || throwable != null) {
- logger.error("{} master fail to scan cs", ip, throwable);
- return;
- }
-
- if (willMap.size() == 0) {
- logger.info("the cs:{} has no will", ip);
- return;
- }
-
- for (Map.Entry<String, String> entry : willMap.entrySet()) {
- logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
- String willKey = entry.getKey();
- String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
-
- WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
- int mqttId = mqttMsgId.nextId(clientId);
- MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
- willMessage.getQos(), mqttId, willMessage.isRetain());
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
- upstreamHookResult.whenComplete((hookResult, tb) -> {
- try {
- if (!hookResult.isSuccess()) {
- executor.submit(this);
- } else {
- willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
- if (!resultDel || tbDel != null) {
- logger.error("fail to delete will message key:{}", willKey);
- return;
- }
- logger.info("delete will message key {} successfully", willKey);
- });
- }
- } catch (Throwable t) {
- logger.error("", t);
- }
- });
- }
- };
- executor.submit(runnable);
- }
- });
- }
-
@Override
public void setChannelManager(ChannelManager channelManager) {
this.channelManager = channelManager;
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
new file mode 100644
index 0000000..e27bb47
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/WillLoop.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.session.loop;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.mqtt.common.facade.WillMsgPersistManager;
+import org.apache.rocketmq.mqtt.common.hook.HookResult;
+import org.apache.rocketmq.mqtt.common.meta.IpUtil;
+import org.apache.rocketmq.mqtt.common.model.Constants;
+import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
+import org.apache.rocketmq.mqtt.common.model.WillMessage;
+import org.apache.rocketmq.mqtt.common.util.MessageUtil;
+import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
+import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class WillLoop {
+ private static Logger logger = LoggerFactory.getLogger(WillLoop.class);
+ private ScheduledThreadPoolExecutor aliveService = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("check_alive_thread_"));
+ private long checkAliveIntervalMillis = 5 * 1000;
+ private ThreadPoolExecutor executor;
+
+ @Resource
+ private WillMsgPersistManager willMsgPersistManager;
+
+ @Resource
+ private MqttMsgId mqttMsgId;
+
+ @Resource
+ private PublishProcessor publishProcessor;
+
+ @PostConstruct
+ public void init() {
+ aliveService.scheduleWithFixedDelay(() -> csLoop(), 15 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
+ aliveService.scheduleWithFixedDelay(() -> masterLoop(), 10 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
+
+ executor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(5000),
+ new ThreadFactoryImpl("DispatchWillToMQ_ "));
+ }
+
+ private void csLoop() {
+ try {
+ String ip = IpUtil.getLocalAddressCompatible();
+ String csKey = Constants.CS_ALIVE + Constants.CTRL_1 + ip;
+ String masterKey = Constants.CS_MASTER;
+ long currentTime = System.currentTimeMillis();
+
+ willMsgPersistManager.put(csKey, String.valueOf(currentTime)).whenComplete((result, throwable) -> {
+ if (result == null || throwable != null) {
+ logger.error("{} fail to put csKey", csKey, throwable);
+ }
+ });
+
+ willMsgPersistManager.get(masterKey).whenComplete((result, throwable) -> {
+ String content = new String(result);
+ if (Constants.NOT_FOUND.equals(content) || masterHasDown(content)) {
+ willMsgPersistManager.compareAndPut(masterKey, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
+ if (!rs || tb != null) {
+ logger.error("{} fail to update master", ip, tb);
+ return;
+ }
+ logger.info("{} update master successfully", ip);
+ });
+ }
+ });
+ } catch (Exception e) {
+ logger.error("", e);
+ }
+ }
+
+ private boolean masterHasDown(String masterValue) {
+ String[] ipTime = masterValue.split(Constants.COLON);
+ if (ipTime.length < 2) {
+ logger.error("master ip:updateTime split error, len < 2");
+ return true;
+ }
+
+ return System.currentTimeMillis() - Long.parseLong(ipTime[1]) > 10 * checkAliveIntervalMillis;
+ }
+
+ private void masterLoop() {
+ try {
+ String ip = IpUtil.getLocalAddressCompatible();
+ if (ip == null) {
+ logger.error("can not get local ip");
+ return;
+ }
+
+ willMsgPersistManager.get(Constants.CS_MASTER).whenComplete((result, throwable) -> {
+ if (result == null || throwable != null) {
+ logger.error("fail to get CS_MASTER", throwable);
+ return;
+ }
+
+ String content = new String(result);
+ if (Constants.NOT_FOUND.equals(content)) {
+ // no master
+ return;
+ }
+
+ if (!content.startsWith(ip)) {
+ // is not master
+ return;
+ }
+ // master keep alive
+ long currentTime = System.currentTimeMillis();
+ willMsgPersistManager.compareAndPut(Constants.CS_MASTER, content, ip + Constants.COLON + currentTime).whenComplete((rs, tb) -> {
+ if (!rs || tb != null) {
+ logger.error("{} fail to update master", ip, tb);
+ }
+ });
+
+ // master to check all cs state
+ String startCSKey = Constants.CS_ALIVE + Constants.CTRL_0;
+ String endCSKey = Constants.CS_ALIVE + Constants.CTRL_2;
+ willMsgPersistManager.scan(startCSKey, endCSKey).whenComplete((rs, tb) -> {
+ if (rs == null || tb != null) {
+ logger.error("{} master fail to scan cs", ip, tb);
+ return;
+ }
+
+ if (rs.size() == 0) {
+ logger.info("master scanned 0 cs");
+ return;
+ }
+
+ for (Map.Entry<String, String> entry : rs.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ logger.info("master:{} scan cs:{}, heart:{} {}", ip, key, value, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.parseLong(value)));
+ if (System.currentTimeMillis() - Long.parseLong(value) > 10 * checkAliveIntervalMillis) {
+ // the cs has down
+ String csIp = key.substring((Constants.CS_ALIVE + Constants.CTRL_1).length());
+ handleShutDownCS(csIp);
+
+ willMsgPersistManager.delete(key).whenComplete((resultDel, tbDel) -> {
+ if (!resultDel || tbDel != null) {
+ logger.error("fail to delete shutDown cs:{} in db", key);
+ return;
+ }
+ logger.debug("delete shutDown cs:{} in db successfully", key);
+ });
+ }
+ }
+ });
+ });
+ } catch (Exception e) {
+ logger.error("", e);
+ }
+ }
+
+ private void handleShutDownCS(String ip) {
+ String startClientKey = ip + Constants.CTRL_0;
+ String endClientKey = ip + Constants.CTRL_2;
+ willMsgPersistManager.scan(startClientKey, endClientKey).whenComplete((willMap, throwable) -> {
+ if (willMap == null || throwable != null) {
+ logger.error("{} master fail to scan cs", ip, throwable);
+ return;
+ }
+
+ if (willMap.size() == 0) {
+ logger.info("the cs:{} has no will", ip);
+ return;
+ }
+
+ for (Map.Entry<String, String> entry : willMap.entrySet()) {
+ logger.info("master handle will {} {}", entry.getKey(), entry.getValue());
+ String willKey = entry.getKey();
+ String clientId = entry.getKey().substring((ip + Constants.CTRL_1).length());
+
+ WillMessage willMessage = JSON.parseObject(entry.getValue(), WillMessage.class);
+ int mqttId = mqttMsgId.nextId(clientId);
+ MqttPublishMessage mqttMessage = MessageUtil.toMqttMessage(willMessage.getWillTopic(), willMessage.getBody(),
+ willMessage.getQos(), mqttId, willMessage.isRetain());
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ CompletableFuture<HookResult> upstreamHookResult = publishProcessor.process(new MqttMessageUpContext(), mqttMessage);
+ upstreamHookResult.whenComplete((hookResult, tb) -> {
+ try {
+ if (!hookResult.isSuccess()) {
+ executor.submit(this);
+ } else {
+ willMsgPersistManager.delete(willKey).whenComplete((resultDel, tbDel) -> {
+ if (!resultDel || tbDel != null) {
+ logger.error("fail to delete will message key:{}", willKey);
+ return;
+ }
+ logger.info("delete will message key {} successfully", willKey);
+ });
+ }
+ } catch (Throwable t) {
+ logger.error("", t);
+ }
+ });
+ }
+ };
+ executor.submit(runnable);
+ }
+ });
+ }
+
+}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
index 5e8cd38..20ac4e5 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java
@@ -118,16 +118,15 @@
public Message parseMessage(byte[] bytes) throws Exception {
Message result;
try {
- result = ReadRequest.parseFrom(bytes);
- return result;
- } catch (Throwable ignore) {
- }
- try {
result = WriteRequest.parseFrom(bytes);
return result;
} catch (Throwable ignore) {
}
-
+ try {
+ result = ReadRequest.parseFrom(bytes);
+ return result;
+ } catch (Throwable ignore) {
+ }
throw new Exception("parse message from bytes error");
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
index 5573cdb..81690c6 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/StateProcessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.mqtt.meta.raft.processor;
+import com.alipay.sofa.jraft.util.BytesUtil;
import com.google.protobuf.ByteString;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
@@ -24,9 +25,13 @@
import org.apache.rocketmq.mqtt.common.meta.Constants;
import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.locks.Lock;
/**
@@ -83,7 +88,6 @@
writeLock.lock();
try {
rocksDBEngine.getRdb().put(rocksDBEngine.getWriteOptions(), key, value);
-
return Response.newBuilder()
.setSuccess(true)
.build();
@@ -100,7 +104,6 @@
writeLock.lock();
try {
rocksDBEngine.getRdb().delete(rocksDBEngine.getWriteOptions(), key);
-
return Response.newBuilder()
.setSuccess(true)
.build();
@@ -112,4 +115,58 @@
}
}
+ public Response compareAndPut(RocksDBEngine rocksDBEngine, byte[] key, byte[] expectValue, byte[] updateValue) throws Exception {
+ final Lock writeLock = rocksDBEngine.getReadWriteLock().writeLock();
+ writeLock.lock();
+ try {
+ final byte[] actual = rocksDBEngine.getRdb().get(key);
+ if (Arrays.equals(expectValue, actual)) {
+ rocksDBEngine.getRdb().put(rocksDBEngine.getWriteOptions(), key, updateValue);
+ return Response.newBuilder()
+ .setSuccess(true)
+ .build();
+ } else {
+ return Response.newBuilder()
+ .setSuccess(false)
+ .build();
+ }
+ } catch (final Exception e) {
+ logger.error("Fail to delete, k {}", key, e);
+ throw e;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey) throws Exception {
+ Map<String, String> result = new HashMap<>();
+ final Lock readLock = rocksDBEngine.getReadWriteLock().readLock();
+ readLock.lock();
+ try {
+ final RocksIterator it = rocksDBEngine.getRdb().newIterator();
+ if (startKey == null) {
+ it.seekToFirst();
+ } else {
+ it.seek(startKey);
+ }
+ while (it.isValid()) {
+ final byte[] key = it.key();
+ if (endKey != null && BytesUtil.compare(key, endKey) >= 0) {
+ break;
+ }
+ result.put(new String(key), new String(it.value()));
+ it.next();
+ }
+ return Response.newBuilder()
+ .setSuccess(true)
+ .putAllDataMap(result)
+ .build();
+ } catch (final Exception e) {
+ logger.error("Fail to delete, startKey {}, endKey {}", startKey, endKey, e);
+ throw e;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
index 17f432d..5376a99 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java
@@ -17,23 +17,15 @@
package org.apache.rocketmq.mqtt.meta.raft.processor;
-import com.alipay.sofa.jraft.util.BytesUtil;
+import org.apache.rocketmq.mqtt.common.meta.Constants;
import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
import org.apache.rocketmq.mqtt.common.model.consistency.Response;
import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
import org.apache.rocketmq.mqtt.meta.raft.MqttRaftServer;
import org.apache.rocketmq.mqtt.meta.raft.MqttStateMachine;
-import org.apache.rocketmq.mqtt.common.meta.Constants;
-import org.apache.rocketmq.mqtt.meta.rocksdb.RocksDBEngine;
-import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-
import static org.apache.rocketmq.mqtt.common.meta.Constants.CATEGORY_WILL_MSG;
public class WillMsgStateProcessor extends StateProcessor {
@@ -108,62 +100,4 @@
return CATEGORY_WILL_MSG;
}
- public Response compareAndPut(RocksDBEngine rocksDBEngine, byte[] key, byte[] expectValue, byte[] updateValue) throws Exception {
- final Lock writeLock = rocksDBEngine.getReadWriteLock().writeLock();
- writeLock.lock();
- try {
- final byte[] actual = rocksDBEngine.getRdb().get(key);
- if (Arrays.equals(expectValue, actual)) {
- rocksDBEngine.getRdb().put(rocksDBEngine.getWriteOptions(), key, updateValue);
- return Response.newBuilder()
- .setSuccess(true)
- .build();
- } else {
- return Response.newBuilder()
- .setSuccess(false)
- .build();
- }
- } catch (final Exception e) {
- logger.error("Fail to delete, k {}", key, e);
- throw e;
- } finally {
- writeLock.unlock();
- }
- }
-
- public Response scan(RocksDBEngine rocksDBEngine, byte[] startKey, byte[] endKey) throws Exception {
- Map<String, String> result = new HashMap<>();
-
- final Lock readLock = rocksDBEngine.getReadWriteLock().readLock();
- readLock.lock();
- try {
-
- final RocksIterator it = rocksDBEngine.getRdb().newIterator();
- if (startKey == null) {
- it.seekToFirst();
- } else {
- it.seek(startKey);
- }
-
- while (it.isValid()) {
- final byte[] key = it.key();
- if (endKey != null && BytesUtil.compare(key, endKey) >= 0) {
- break;
- }
- result.put(new String(key), new String(it.value()));
- it.next();
- }
-
- return Response.newBuilder()
- .setSuccess(true)
- .putAllDataMap(result)
- .build();
- } catch (final Exception e) {
- logger.error("Fail to delete, startKey {}, endKey {}", startKey, endKey, e);
- throw e;
- } finally {
- readLock.unlock();
- }
- }
-
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java
index 7d57590..be0ff7d 100644
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java
+++ b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/rocksdb/RocksDBSnapshot.java
@@ -78,7 +78,7 @@
}
}
- public void writeSnapshot(final String snapshotPath) {
+ private void writeSnapshot(final String snapshotPath) {
Lock lock = rocksDBEngine.getReadWriteLock().writeLock();
lock.lock();
try (final Checkpoint checkpoint = Checkpoint.create(rocksDBEngine.getRdb())) {
@@ -98,7 +98,7 @@
}
}
- public void compressSnapshot(final SnapshotWriter writer, final LocalFileMetaOutter.LocalFileMeta.Builder metaBuilder, final Closure done) {
+ private void compressSnapshot(final SnapshotWriter writer, final LocalFileMetaOutter.LocalFileMeta.Builder metaBuilder, final Closure done) {
final String writerPath = writer.getPath();
final String outputFile = Paths.get(writerPath, SNAPSHOT_ARCHIVE).toString();
try {
@@ -116,7 +116,7 @@
}
}
- public void decompressSnapshot(final String readerPath, final LocalFileMetaOutter.LocalFileMeta meta) throws Throwable {
+ private void decompressSnapshot(final String readerPath, final LocalFileMetaOutter.LocalFileMeta meta) throws Throwable {
final String sourceFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString();
final Checksum checksum = new CRC64();
ZipStrategyManager.getDefault().deCompress(sourceFile, readerPath, checksum);
@@ -125,7 +125,7 @@
}
}
- public void readSnapshot(final String snapshotPath) {
+ private void readSnapshot(final String snapshotPath) {
Lock lock = rocksDBEngine.getReadWriteLock().readLock();
lock.lock();
try {
@@ -149,7 +149,7 @@
}
}
- public LocalFileMetaOutter.LocalFileMeta.Builder writeMetadata(final LocalFileMetaOutter.LocalFileMeta metadata) {
+ private LocalFileMetaOutter.LocalFileMeta.Builder writeMetadata(final LocalFileMetaOutter.LocalFileMeta metadata) {
if (metadata == null) {
return LocalFileMetaOutter.LocalFileMeta.newBuilder();
}
diff --git a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/util/RaftExecutor.java b/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/util/RaftExecutor.java
deleted file mode 100644
index a6cfca2..0000000
--- a/mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/util/RaftExecutor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.mqtt.meta.util;
-
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-
-/**
- * raft executor.
- */
-public final class RaftExecutor {
-
- private static ExecutorService raftSnapshotExecutor;
-
- private RaftExecutor() {
- init();
- }
-
- public static void init() {
- raftSnapshotExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactoryImpl("loop_snapshot"));
- }
-
- public static void doSnapshot(Runnable runnable) {
- raftSnapshotExecutor.execute(runnable);
- }
-
-}