make the server support heartbeat to release invalid long connections (#814)
diff --git a/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/MQTTBridge.java b/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/MQTTBridge.java
index fc7af85..63c77ea 100644
--- a/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/MQTTBridge.java
+++ b/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/MQTTBridge.java
@@ -27,6 +27,7 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
+import io.netty.handler.timeout.IdleStateHandler;
import org.apache.rocketmq.iot.common.config.MqttBridgeConfig;
import org.apache.rocketmq.iot.common.data.Message;
import org.apache.rocketmq.iot.connection.client.ClientManager;
@@ -51,6 +52,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
public class MQTTBridge {
private Logger logger = LoggerFactory.getLogger(MQTTBridge.class);
@@ -96,6 +99,7 @@
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new IdleStateHandler(0, 0, bridgeConfig.getHeartbeatAllidleTime(), TimeUnit.SECONDS));
pipeline.addLast("mqtt-decoder", new MqttDecoder());
pipeline.addLast("mqtt-encoder", MqttEncoder.INSTANCE);
pipeline.addLast("channel-idle-handler", new MqttIdleHandler());
diff --git a/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/config/MqttBridgeConfig.java b/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/config/MqttBridgeConfig.java
index f7eb011..dd6507d 100644
--- a/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/config/MqttBridgeConfig.java
+++ b/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/config/MqttBridgeConfig.java
@@ -29,6 +29,7 @@
private int bossGroupThreadNum;
private int workerGroupThreadNum;
private int socketBacklogSize;
+ private long heartbeatAllidleTime;
private boolean enableRocketMQStore;
private String rmqNamesrvAddr;
@@ -56,6 +57,8 @@
MQTT_SERVER_WORKER_GROUP_THREAD_NUM_DEFAULT));
this.socketBacklogSize = Integer.parseInt(System.getProperty(MQTT_SERVER_SOCKET_BACKLOG_SIZE,
MQTT_SERVER_SOCKET_BACKLOG_SIZE_DEFAULT));
+ this.heartbeatAllidleTime = Long.parseLong(System.getProperty(MQTT_BROKER_HEARTBEAT_ALLIDLETIME,
+ MQTT_BROKER_HEARTBEAT_ALLIDLETIME_DEFAULT));
this.enableRocketMQStore = Boolean.parseBoolean(System.getProperty(MQTT_ROCKETMQ_STORE_ENABLED, MQTT_ROCKETMQ_STORE_ENABLED_DEFAULT));
if (enableRocketMQStore) {
@@ -91,6 +94,10 @@
return socketBacklogSize;
}
+ public long getHeartbeatAllidleTime() {
+ return heartbeatAllidleTime;
+ }
+
public boolean isEnableRocketMQStore() {
return enableRocketMQStore;
}
@@ -126,6 +133,7 @@
", bossGroupThreadNum=" + bossGroupThreadNum +
", workerGroupThreadNum=" + workerGroupThreadNum +
", socketBacklogSize=" + socketBacklogSize +
+ ", heartbeatAllidleTime=" + heartbeatAllidleTime +
", enableRocketMQStore=" + enableRocketMQStore +
", rmqNamesrvAddr='" + rmqNamesrvAddr + '\'' +
", rmqProductGroup='" + rmqProductGroup + '\'' +
diff --git a/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/configuration/MQTTBridgeConfiguration.java b/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/configuration/MQTTBridgeConfiguration.java
index 6071d46..9850470 100644
--- a/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/configuration/MQTTBridgeConfiguration.java
+++ b/rocketmq-iot-bridge/src/main/java/org/apache/rocketmq/iot/common/configuration/MQTTBridgeConfiguration.java
@@ -36,6 +36,9 @@
public static final String MQTT_SERVER_SOCKET_BACKLOG_SIZE = "iot.mqtt.server.socket.backlog.size";
public static final String MQTT_SERVER_SOCKET_BACKLOG_SIZE_DEFAULT = "1024";
+ public static final String MQTT_BROKER_HEARTBEAT_ALLIDLETIME = "iot.mqtt.server.heartbeat.allidletime";
+ public static final String MQTT_BROKER_HEARTBEAT_ALLIDLETIME_DEFAULT = "120";
+
/**
* iot mqtt bridge broker store configuration
*/