Add Closeable support
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriber.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriber.java
new file mode 100644
index 0000000..3506888
--- /dev/null
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriber.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.rcomp.api;
+
+import java.io.Closeable;
+
+import org.reactivestreams.Subscriber;
+
+public interface CloseableSubscriber<T> extends Subscriber<T>, Closeable {
+    
+}
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriberAdapter.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriberAdapter.java
new file mode 100644
index 0000000..d724f16
--- /dev/null
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriberAdapter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.rcomp.api;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * Adapts an arbitrary subscriber implementing Closeable to CloseableSubscriber
+ */
+public class CloseableSubscriberAdapter<T> implements CloseableSubscriber<T> {
+    
+    private Subscriber<T> subscriber;
+
+    public CloseableSubscriberAdapter(Subscriber<T> subscriber) {
+//        if (!(subscriber instanceof Closeable)) {
+//            throw new IllegalArgumentException("Subscriber must implement Closeable");
+//        }
+        this.subscriber = subscriber;
+    }
+
+    @Override
+    public void onSubscribe(Subscription s) {
+        subscriber.onSubscribe(s);
+    }
+
+    @Override
+    public void onNext(T t) {
+        subscriber.onNext(t);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        subscriber.onError(t);
+    }
+
+    @Override
+    public void onComplete() {
+        subscriber.onComplete();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (subscriber instanceof Closeable) {
+            ((Closeable)subscriber).close();
+        }
+    }
+    
+}
\ No newline at end of file
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java
index 18096f4..8cb80eb 100644
--- a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.karaf.rcomp.api;
 
 import aQute.bnd.annotation.headers.ProvideCapability;
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java
index 323ecff..c381be4 100644
--- a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java
@@ -17,9 +17,8 @@
 package org.apache.karaf.rcomp.api;
 
 import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
 
 public interface RComponent {
     <T> Publisher<T> from(String destination, Class<T> type);
-    <T> Subscriber<T> to(String destination, Class<T> type);
+    <T> CloseableSubscriber<T> to(String destination, Class<T> type);
 }
diff --git a/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java b/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java
index 90334d6..d8a8836 100644
--- a/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java
+++ b/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java
@@ -21,6 +21,8 @@
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
 import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
 import org.apache.camel.core.osgi.OsgiServiceRegistry;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
+import org.apache.karaf.rcomp.api.CloseableSubscriberAdapter;
 import org.apache.karaf.rcomp.api.ProvComp;
 import org.apache.karaf.rcomp.api.RComponent;
 import org.osgi.framework.BundleContext;
@@ -29,7 +31,6 @@
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
 
 @ProvComp(name = "camel")
 @Component(property = "name=camel")
@@ -69,7 +70,7 @@
     }
 
     @Override
-    public <T> Subscriber<T> to(String topic, Class<T> type) {
-        return camelreactive.subscriber(topic, type);
+    public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
+        return new CloseableSubscriberAdapter<T>(camelreactive.subscriber(topic, type));
     }
 }
diff --git a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java
index 3f3158e..0c8ea2e 100644
--- a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java
+++ b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.karaf.rcomp.eventadmin;
 
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.apache.karaf.rcomp.api.ProvComp;
 import org.apache.karaf.rcomp.api.RComponent;
 import org.osgi.framework.BundleContext;
@@ -25,7 +26,6 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.EventAdmin;
 import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
 
 @ProvComp(name="eventAdmin")
 @Component(property="name=eventAdmin")
@@ -52,7 +52,7 @@
     }  
     
     @Override
-    public <T> Subscriber<T> to(String topic, Class<T> type) {
+    public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
         return new EventAdminDestination<T>(client, topic, type);
     }
 
diff --git a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java
index 61d3b5c..f692e98 100644
--- a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java
+++ b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java
@@ -18,15 +18,13 @@
 
 import java.util.Map;
 
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
-import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
-import org.slf4j.LoggerFactory;
 
-public class EventAdminDestination<T> implements Subscriber<T> {
-    private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(EventAdminDestination.class);
-    
+public class EventAdminDestination<T> implements CloseableSubscriber<T> {
+
     private EventAdmin client;
     private String topic;
     private Subscription subscription;
@@ -76,4 +74,9 @@
     public void onComplete() {
     }
 
+    @Override
+    public void close() {
+        this.subscription.cancel();
+    }
+
 }
diff --git a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/EventAdminExample.java b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/eventadmin/EventAdminExample.java
similarity index 96%
rename from rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/EventAdminExample.java
rename to rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/eventadmin/EventAdminExample.java
index a309631..6fea04f 100644
--- a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/EventAdminExample.java
+++ b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/eventadmin/EventAdminExample.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.karaf.rcomp.examples.mqtt;
+package org.apache.karaf.rcomp.examples.eventadmin;
 
 import java.util.Map;
 
diff --git a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java
index 14f8dbf..b8dc211 100644
--- a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java
+++ b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java
@@ -46,12 +46,10 @@
         Flux.interval(Duration.ofSeconds(1))
             .map(ByteArrayConverter::fromLong)
             .subscribe(toTopic);
-        LOG.info("mqtt test component started4");
     }
     
     @Deactivate
     public void stop() throws Exception {
-        LOG.info("mqtt test component stopped");
         toTopic.onComplete();
     }
 
diff --git a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java
index 03eca67..a2cc389 100644
--- a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java
+++ b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java
@@ -43,7 +43,6 @@
 
     @Activate
     public void start() throws Exception {
-        LOG.info("Starting mqtt receiver");
         Publisher<byte[]> fromTopic = mqtt.from("output", byte[].class);
         flux = Flux.from(fromTopic)
             .map(ByteArrayConverter::asDouble)
diff --git a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java
index 476f68b..e000eba 100644
--- a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java
+++ b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java
@@ -18,6 +18,7 @@
 
 import java.util.Dictionary;
 
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.apache.karaf.rcomp.api.ProvComp;
 import org.apache.karaf.rcomp.api.RComponent;
 import org.osgi.service.component.ComponentContext;
@@ -26,7 +27,6 @@
 import org.osgi.service.component.annotations.ConfigurationPolicy;
 import org.osgi.service.component.annotations.Deactivate;
 import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
 
 @ProvComp(name="kafka")
 @Component(immediate = true, 
@@ -58,7 +58,7 @@
     }
 
     @Override
-    public <T> Subscriber<T> to(String topic, Class<T> type) {
+    public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
         return new KafkaDestination<T>(ConfigMapper.mapProducer(rawConfig), topic, type);
     }
 
diff --git a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java
index 3dbf2d5..43df467 100644
--- a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java
+++ b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java
@@ -22,13 +22,13 @@
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.reactivestreams.Subscriber;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.reactivestreams.Subscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("rawtypes")
-public class KafkaDestination<T> implements Subscriber<T>, AutoCloseable {
+public class KafkaDestination<T> implements CloseableSubscriber<T> {
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaDestination.class);
     private String topic;
     private Subscription subscription;
@@ -91,7 +91,7 @@
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         if (subscription != null) {
             subscription.cancel();
             subscription = null;
diff --git a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java
index 89c41d1..ef5a534 100644
--- a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java
+++ b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java
@@ -18,12 +18,12 @@
 
 import javax.mail.Session;
 
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.apache.karaf.rcomp.api.ProvComp;
 import org.apache.karaf.rcomp.api.RComponent;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
 
 @ProvComp(name="mail")
 @Component(property="name=mail")
@@ -38,7 +38,7 @@
     }  
     
     @Override
-    public <T> Subscriber<T> to(String destination, Class<T> type) {
+    public <T> CloseableSubscriber<T> to(String destination, Class<T> type) {
         return new MailDestination<T>(destination, type);
     }
 
diff --git a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java
index ac5d00c..4f04bac 100644
--- a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java
+++ b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java
@@ -22,10 +22,10 @@
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeMessage;
 
-import org.reactivestreams.Subscriber;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.reactivestreams.Subscription;
 
-public class MailDestination<T> implements Subscriber<T> {
+public class MailDestination<T> implements CloseableSubscriber<T> {
 
     private String destination;
     private Subscription subscription;
@@ -67,4 +67,9 @@
     public void onComplete() {
     }
 
+    @Override
+    public void close() {
+        subscription.cancel();
+    }
+
 }
diff --git a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java
index 6fd5f22..9824967 100644
--- a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java
@@ -19,9 +19,11 @@
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.apache.karaf.rcomp.api.ProvComp;
 import org.apache.karaf.rcomp.api.RComponent;
 import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.osgi.service.component.annotations.Activate;
@@ -29,11 +31,13 @@
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @ProvComp(name="mqtt")
 @Component(property="name=mqtt")
 public class MqttComponent implements RComponent {
+    private static Logger LOGGER = LoggerFactory.getLogger(MqttComponent.class);
     
     MqttClient client;
     private Set<MqttDestination<?>> destinations = new HashSet<>();
@@ -42,17 +46,26 @@
     @interface MqttConfig {
         String serverUrl() default "tcp://localhost:1883";
         String clientId();
+        String userName();
+        String password();
     }
     
     @Activate
     public void activate(MqttConfig config) throws MqttException {
         client = new MqttClient(config.serverUrl(), MqttClient.generateClientId(),
                                 new MemoryPersistence());
-        client.connect();
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setAutomaticReconnect(true);
+        options.setUserName(config.userName());
+        if (config.password() != null) {
+            options.setPassword(config.password().toCharArray());
+        }
+        client.connect(options);
     }
     
     @Deactivate
     public void deactivate() throws MqttException {
+        LOGGER.info("Shutting down mqtt component with " + destinations.size() + " desitnations");
         for (MqttDestination<?> destination : destinations) {
             try {
                 destination.close(); 
@@ -65,11 +78,13 @@
 
     @Override
     public <T> Publisher<T> from(String topic, Class<T> type) {
+        LOGGER.info("Creating mqtt Publisher on topic " + topic);
         return new MqttSource<T>(client, topic, type);
     }  
     
     @Override
-    public <T> Subscriber<T> to(String topic, Class<T> type) {
+    public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
+        LOGGER.info("Creating mqtt Subscriber on topic " + topic);
         MqttDestination<T> destination = new MqttDestination<T>(client, topic, type);
         destinations.add(destination);
         return destination;
diff --git a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java
index 44d03f6..79dc1c1 100644
--- a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java
@@ -16,12 +16,12 @@
  */
 package org.apache.karaf.rcomp.mqtt;
 
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MqttDestination<T> implements Subscriber<T>, AutoCloseable {
+public class MqttDestination<T> implements CloseableSubscriber<T> {
     
     private MqttClient client;
     private String topic;
@@ -68,7 +68,7 @@
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         if (subscription != null) {
             subscription.cancel();
             subscription = null;