Make mqtt component cope with restarts. Improve examples and starter
diff --git a/rcomp-app/pom.xml b/rcomp-app/pom.xml
index 0fc6c96..f6400a3 100644
--- a/rcomp-app/pom.xml
+++ b/rcomp-app/pom.xml
@@ -6,7 +6,7 @@
 		<artifactId>rcomp-parent</artifactId>
 		<version>1.0.0-SNAPSHOT</version>
 	</parent>
-	<artifactId>rcomp-index</artifactId>
+	<artifactId>rcomp-app</artifactId>
 
 	<properties>
 		<rcomp.version>${project.version}</rcomp.version>
diff --git a/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java b/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
index 0d1b40b..851bfaf 100644
--- a/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
+++ b/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
@@ -23,5 +23,9 @@
         return fromString(in.toString());
     }
     
+    public static byte[] fromLong(Long in) {
+        return fromString(in.toString());
+    }
+    
     
 }
diff --git a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
index 1086cc4..e9f0ddf 100644
--- a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
+++ b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
@@ -20,6 +20,7 @@
     @Reference(target="(name=eventAdmin)")
     MComponent eventAdmin;
 
+    @SuppressWarnings("rawtypes")
     @Activate
     public void start() throws Exception {
         Publisher<Map> fromTopic = eventAdmin.from("input", Map.class);
diff --git a/rcomp-examples/src/main/java/reactortest/MqttEmitter.java b/rcomp-examples/src/main/java/reactortest/MqttEmitter.java
new file mode 100644
index 0000000..14e10f1
--- /dev/null
+++ b/rcomp-examples/src/main/java/reactortest/MqttEmitter.java
@@ -0,0 +1,37 @@
+package reactortest;
+
+import java.time.Duration;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.reactivestreams.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import component.api.MComponent;
+import reactor.core.publisher.Flux;
+
+@Component(immediate=true)
+public class MqttEmitter {
+    Logger LOG = LoggerFactory.getLogger(MqttEmitter.class);
+    
+    @Reference(target="(name=mqtt)")
+    MComponent mqtt;
+
+    @Activate
+    public void start() throws Exception {
+        Subscriber<byte[]> toTopic = mqtt.to("input", byte[].class);
+        Flux.interval(Duration.ofSeconds(1))
+            .map(ByteArrayConverter::fromLong)
+            .subscribe(toTopic);
+        LOG.info("mqtt test component started4");
+    }
+    
+    @Deactivate
+    public void stop() throws Exception {
+        LOG.info("mqtt test component stopped");
+    }
+
+}
diff --git a/rcomp-examples/src/main/java/reactortest/MqttExample.java b/rcomp-examples/src/main/java/reactortest/MqttExample.java
index 1c37247..602d743 100644
--- a/rcomp-examples/src/main/java/reactortest/MqttExample.java
+++ b/rcomp-examples/src/main/java/reactortest/MqttExample.java
@@ -21,7 +21,7 @@
 
     @Activate
     public void start() throws Exception {
-        LOG.info("Starting mqtt test component");
+        LOG.info("Starting mqtt test component2");
         Publisher<byte[]> fromTopic = mqtt.from("input", byte[].class);
         Subscriber<byte[]> toTopic = mqtt.to("output", byte[].class);
         Flux.from(fromTopic)
diff --git a/rcomp-examples/src/main/java/reactortest/MqttReceiver.java b/rcomp-examples/src/main/java/reactortest/MqttReceiver.java
new file mode 100644
index 0000000..8c0a20b
--- /dev/null
+++ b/rcomp-examples/src/main/java/reactortest/MqttReceiver.java
@@ -0,0 +1,36 @@
+package reactortest;
+
+import java.util.function.Consumer;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import component.api.MComponent;
+import reactor.core.publisher.Flux;
+
+@Component(immediate=true)
+public class MqttReceiver implements Consumer<Double>{
+    Logger LOG = LoggerFactory.getLogger(MqttReceiver.class);
+    
+    @Reference(target="(name=mqtt)")
+    MComponent mqtt;
+
+    @Activate
+    public void start() throws Exception {
+        LOG.info("Starting mqtt receiver");
+        Publisher<byte[]> fromTopic = mqtt.from("output", byte[].class);
+        Flux.from(fromTopic)
+            .map(ByteArrayConverter::asDouble)
+            .subscribe(this);
+    }
+
+    @Override
+    public void accept(Double average) {
+        System.out.println("Received average value of " + average);
+    }
+
+}
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
index d72a88c..64c7905 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
@@ -1,7 +1,11 @@
 package component.mqtt;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -15,6 +19,7 @@
 public class MqttComponent implements MComponent {
     
     MqttClient client;
+    private Set<MqttDestination<?>> destinations = new HashSet<>();
 
     @ObjectClassDefinition(name = "MQTT config")
     @interface MqttConfig {
@@ -24,12 +29,19 @@
     
     @Activate
     public void activate(MqttConfig config) throws MqttException {
-        client = new MqttClient(config.serverUrl(), MqttClient.generateClientId());
+        client = new MqttClient(config.serverUrl(), MqttClient.generateClientId(),
+                                new MemoryPersistence());
         client.connect();
     }
     
     @Deactivate
     public void deactivate() throws MqttException {
+        for (MqttDestination<?> destination : destinations) {
+            try {
+                destination.close(); 
+            } catch (Exception e) {
+            }
+        }
         client.disconnect();
         client.close();
     }
@@ -41,7 +53,9 @@
     
     @Override
     public <T> Subscriber<T> to(String topic, Class<T> type) {
-        return new MqttDestination<T>(client, topic, type);
+        MqttDestination<T> destination = new MqttDestination<T>(client, topic, type);
+        destinations.add(destination);
+        return destination;
     }
 
 }
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
index 52236cd..8c31518 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
@@ -5,7 +5,7 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MqttDestination<T> implements Subscriber<T> {
+public class MqttDestination<T> implements Subscriber<T>, AutoCloseable {
     
     private MqttClient client;
     private String topic;
@@ -51,4 +51,11 @@
         System.out.println("oncomplete");
     }
 
+    @Override
+    public void close() throws Exception {
+        if (subscription != null) {
+            subscription.cancel();
+            subscription = null;
+        }
+    }
 }
diff --git a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
index 2674c3f..b20f119 100644
--- a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
+++ b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
@@ -19,7 +19,7 @@
     @Test
     public void testMQtt() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId(), new MemoryPersistence());
+        MqttClient client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId(), new MemoryPersistence());
         client.connect();
         MqttComponent mqtt = new MqttComponent();
         mqtt.client = client;