Make mqtt component cope with restarts. Improve examples and starter
diff --git a/rcomp-app/pom.xml b/rcomp-app/pom.xml
index 0fc6c96..f6400a3 100644
--- a/rcomp-app/pom.xml
+++ b/rcomp-app/pom.xml
@@ -6,7 +6,7 @@
<artifactId>rcomp-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
- <artifactId>rcomp-index</artifactId>
+ <artifactId>rcomp-app</artifactId>
<properties>
<rcomp.version>${project.version}</rcomp.version>
diff --git a/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java b/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
index 0d1b40b..851bfaf 100644
--- a/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
+++ b/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
@@ -23,5 +23,9 @@
return fromString(in.toString());
}
+ public static byte[] fromLong(Long in) {
+ return fromString(in.toString());
+ }
+
}
diff --git a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
index 1086cc4..e9f0ddf 100644
--- a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
+++ b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
@@ -20,6 +20,7 @@
@Reference(target="(name=eventAdmin)")
MComponent eventAdmin;
+ @SuppressWarnings("rawtypes")
@Activate
public void start() throws Exception {
Publisher<Map> fromTopic = eventAdmin.from("input", Map.class);
diff --git a/rcomp-examples/src/main/java/reactortest/MqttEmitter.java b/rcomp-examples/src/main/java/reactortest/MqttEmitter.java
new file mode 100644
index 0000000..14e10f1
--- /dev/null
+++ b/rcomp-examples/src/main/java/reactortest/MqttEmitter.java
@@ -0,0 +1,37 @@
+package reactortest;
+
+import java.time.Duration;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.reactivestreams.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import component.api.MComponent;
+import reactor.core.publisher.Flux;
+
+@Component(immediate=true)
+public class MqttEmitter {
+ Logger LOG = LoggerFactory.getLogger(MqttEmitter.class);
+
+ @Reference(target="(name=mqtt)")
+ MComponent mqtt;
+
+ @Activate
+ public void start() throws Exception {
+ Subscriber<byte[]> toTopic = mqtt.to("input", byte[].class);
+ Flux.interval(Duration.ofSeconds(1))
+ .map(ByteArrayConverter::fromLong)
+ .subscribe(toTopic);
+ LOG.info("mqtt test component started4");
+ }
+
+ @Deactivate
+ public void stop() throws Exception {
+ LOG.info("mqtt test component stopped");
+ }
+
+}
diff --git a/rcomp-examples/src/main/java/reactortest/MqttExample.java b/rcomp-examples/src/main/java/reactortest/MqttExample.java
index 1c37247..602d743 100644
--- a/rcomp-examples/src/main/java/reactortest/MqttExample.java
+++ b/rcomp-examples/src/main/java/reactortest/MqttExample.java
@@ -21,7 +21,7 @@
@Activate
public void start() throws Exception {
- LOG.info("Starting mqtt test component");
+ LOG.info("Starting mqtt test component2");
Publisher<byte[]> fromTopic = mqtt.from("input", byte[].class);
Subscriber<byte[]> toTopic = mqtt.to("output", byte[].class);
Flux.from(fromTopic)
diff --git a/rcomp-examples/src/main/java/reactortest/MqttReceiver.java b/rcomp-examples/src/main/java/reactortest/MqttReceiver.java
new file mode 100644
index 0000000..8c0a20b
--- /dev/null
+++ b/rcomp-examples/src/main/java/reactortest/MqttReceiver.java
@@ -0,0 +1,36 @@
+package reactortest;
+
+import java.util.function.Consumer;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import component.api.MComponent;
+import reactor.core.publisher.Flux;
+
+@Component(immediate=true)
+public class MqttReceiver implements Consumer<Double>{
+ Logger LOG = LoggerFactory.getLogger(MqttReceiver.class);
+
+ @Reference(target="(name=mqtt)")
+ MComponent mqtt;
+
+ @Activate
+ public void start() throws Exception {
+ LOG.info("Starting mqtt receiver");
+ Publisher<byte[]> fromTopic = mqtt.from("output", byte[].class);
+ Flux.from(fromTopic)
+ .map(ByteArrayConverter::asDouble)
+ .subscribe(this);
+ }
+
+ @Override
+ public void accept(Double average) {
+ System.out.println("Received average value of " + average);
+ }
+
+}
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
index d72a88c..64c7905 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
@@ -1,7 +1,11 @@
package component.mqtt;
+import java.util.HashSet;
+import java.util.Set;
+
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -15,6 +19,7 @@
public class MqttComponent implements MComponent {
MqttClient client;
+ private Set<MqttDestination<?>> destinations = new HashSet<>();
@ObjectClassDefinition(name = "MQTT config")
@interface MqttConfig {
@@ -24,12 +29,19 @@
@Activate
public void activate(MqttConfig config) throws MqttException {
- client = new MqttClient(config.serverUrl(), MqttClient.generateClientId());
+ client = new MqttClient(config.serverUrl(), MqttClient.generateClientId(),
+ new MemoryPersistence());
client.connect();
}
@Deactivate
public void deactivate() throws MqttException {
+ for (MqttDestination<?> destination : destinations) {
+ try {
+ destination.close();
+ } catch (Exception e) {
+ }
+ }
client.disconnect();
client.close();
}
@@ -41,7 +53,9 @@
@Override
public <T> Subscriber<T> to(String topic, Class<T> type) {
- return new MqttDestination<T>(client, topic, type);
+ MqttDestination<T> destination = new MqttDestination<T>(client, topic, type);
+ destinations.add(destination);
+ return destination;
}
}
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
index 52236cd..8c31518 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
@@ -5,7 +5,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MqttDestination<T> implements Subscriber<T> {
+public class MqttDestination<T> implements Subscriber<T>, AutoCloseable {
private MqttClient client;
private String topic;
@@ -51,4 +51,11 @@
System.out.println("oncomplete");
}
+ @Override
+ public void close() throws Exception {
+ if (subscription != null) {
+ subscription.cancel();
+ subscription = null;
+ }
+ }
}
diff --git a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
index 2674c3f..b20f119 100644
--- a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
+++ b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
@@ -19,7 +19,7 @@
@Test
public void testMQtt() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId(), new MemoryPersistence());
+ MqttClient client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId(), new MemoryPersistence());
client.connect();
MqttComponent mqtt = new MqttComponent();
mqtt.client = client;