Add Closeable support
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriber.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriber.java
new file mode 100644
index 0000000..3506888
--- /dev/null
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriber.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.rcomp.api;
+
+import java.io.Closeable;
+
+import org.reactivestreams.Subscriber;
+
+public interface CloseableSubscriber<T> extends Subscriber<T>, Closeable {
+
+}
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriberAdapter.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriberAdapter.java
new file mode 100644
index 0000000..d724f16
--- /dev/null
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/CloseableSubscriberAdapter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.rcomp.api;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * Adapts an arbitrary subscriber implementing Closeable to CloseableSubscriber
+ */
+public class CloseableSubscriberAdapter<T> implements CloseableSubscriber<T> {
+
+ private Subscriber<T> subscriber;
+
+ public CloseableSubscriberAdapter(Subscriber<T> subscriber) {
+// if (!(subscriber instanceof Closeable)) {
+// throw new IllegalArgumentException("Subscriber must implement Closeable");
+// }
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscriber.onSubscribe(s);
+ }
+
+ @Override
+ public void onNext(T t) {
+ subscriber.onNext(t);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ subscriber.onError(t);
+ }
+
+ @Override
+ public void onComplete() {
+ subscriber.onComplete();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (subscriber instanceof Closeable) {
+ ((Closeable)subscriber).close();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java
index 18096f4..8cb80eb 100644
--- a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/ProvComp.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.karaf.rcomp.api;
import aQute.bnd.annotation.headers.ProvideCapability;
diff --git a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java
index 323ecff..c381be4 100644
--- a/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java
+++ b/rcomp-api/src/main/java/org/apache/karaf/rcomp/api/RComponent.java
@@ -17,9 +17,8 @@
package org.apache.karaf.rcomp.api;
import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
public interface RComponent {
<T> Publisher<T> from(String destination, Class<T> type);
- <T> Subscriber<T> to(String destination, Class<T> type);
+ <T> CloseableSubscriber<T> to(String destination, Class<T> type);
}
diff --git a/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java b/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java
index 90334d6..d8a8836 100644
--- a/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java
+++ b/rcomp-camel/src/main/java/org/apache/karaf/rcomp/camel/CamelComponent.java
@@ -21,6 +21,8 @@
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
import org.apache.camel.core.osgi.OsgiServiceRegistry;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
+import org.apache.karaf.rcomp.api.CloseableSubscriberAdapter;
import org.apache.karaf.rcomp.api.ProvComp;
import org.apache.karaf.rcomp.api.RComponent;
import org.osgi.framework.BundleContext;
@@ -29,7 +31,6 @@
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
@ProvComp(name = "camel")
@Component(property = "name=camel")
@@ -69,7 +70,7 @@
}
@Override
- public <T> Subscriber<T> to(String topic, Class<T> type) {
- return camelreactive.subscriber(topic, type);
+ public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
+ return new CloseableSubscriberAdapter<T>(camelreactive.subscriber(topic, type));
}
}
diff --git a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java
index 3f3158e..0c8ea2e 100644
--- a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java
+++ b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminComponent.java
@@ -16,6 +16,7 @@
*/
package org.apache.karaf.rcomp.eventadmin;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.apache.karaf.rcomp.api.ProvComp;
import org.apache.karaf.rcomp.api.RComponent;
import org.osgi.framework.BundleContext;
@@ -25,7 +26,6 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
@ProvComp(name="eventAdmin")
@Component(property="name=eventAdmin")
@@ -52,7 +52,7 @@
}
@Override
- public <T> Subscriber<T> to(String topic, Class<T> type) {
+ public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
return new EventAdminDestination<T>(client, topic, type);
}
diff --git a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java
index 61d3b5c..f692e98 100644
--- a/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java
+++ b/rcomp-eventadmin/src/main/java/org/apache/karaf/rcomp/eventadmin/EventAdminDestination.java
@@ -18,15 +18,13 @@
import java.util.Map;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
-import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-import org.slf4j.LoggerFactory;
-public class EventAdminDestination<T> implements Subscriber<T> {
- private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(EventAdminDestination.class);
-
+public class EventAdminDestination<T> implements CloseableSubscriber<T> {
+
private EventAdmin client;
private String topic;
private Subscription subscription;
@@ -76,4 +74,9 @@
public void onComplete() {
}
+ @Override
+ public void close() {
+ this.subscription.cancel();
+ }
+
}
diff --git a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/EventAdminExample.java b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/eventadmin/EventAdminExample.java
similarity index 96%
rename from rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/EventAdminExample.java
rename to rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/eventadmin/EventAdminExample.java
index a309631..6fea04f 100644
--- a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/EventAdminExample.java
+++ b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/eventadmin/EventAdminExample.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.karaf.rcomp.examples.mqtt;
+package org.apache.karaf.rcomp.examples.eventadmin;
import java.util.Map;
diff --git a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java
index 14f8dbf..b8dc211 100644
--- a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java
+++ b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttEmitter.java
@@ -46,12 +46,10 @@
Flux.interval(Duration.ofSeconds(1))
.map(ByteArrayConverter::fromLong)
.subscribe(toTopic);
- LOG.info("mqtt test component started4");
}
@Deactivate
public void stop() throws Exception {
- LOG.info("mqtt test component stopped");
toTopic.onComplete();
}
diff --git a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java
index 03eca67..a2cc389 100644
--- a/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java
+++ b/rcomp-examples/rcomp-examples/src/main/java/org/apache/karaf/rcomp/examples/mqtt/MqttReceiver.java
@@ -43,7 +43,6 @@
@Activate
public void start() throws Exception {
- LOG.info("Starting mqtt receiver");
Publisher<byte[]> fromTopic = mqtt.from("output", byte[].class);
flux = Flux.from(fromTopic)
.map(ByteArrayConverter::asDouble)
diff --git a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java
index 476f68b..e000eba 100644
--- a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java
+++ b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaComponent.java
@@ -18,6 +18,7 @@
import java.util.Dictionary;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.apache.karaf.rcomp.api.ProvComp;
import org.apache.karaf.rcomp.api.RComponent;
import org.osgi.service.component.ComponentContext;
@@ -26,7 +27,6 @@
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
@ProvComp(name="kafka")
@Component(immediate = true,
@@ -58,7 +58,7 @@
}
@Override
- public <T> Subscriber<T> to(String topic, Class<T> type) {
+ public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
return new KafkaDestination<T>(ConfigMapper.mapProducer(rawConfig), topic, type);
}
diff --git a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java
index 3dbf2d5..43df467 100644
--- a/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java
+++ b/rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java
@@ -22,13 +22,13 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.reactivestreams.Subscriber;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("rawtypes")
-public class KafkaDestination<T> implements Subscriber<T>, AutoCloseable {
+public class KafkaDestination<T> implements CloseableSubscriber<T> {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaDestination.class);
private String topic;
private Subscription subscription;
@@ -91,7 +91,7 @@
}
@Override
- public void close() throws Exception {
+ public void close() {
if (subscription != null) {
subscription.cancel();
subscription = null;
diff --git a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java
index 89c41d1..ef5a534 100644
--- a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java
+++ b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailComponent.java
@@ -18,12 +18,12 @@
import javax.mail.Session;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.apache.karaf.rcomp.api.ProvComp;
import org.apache.karaf.rcomp.api.RComponent;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
@ProvComp(name="mail")
@Component(property="name=mail")
@@ -38,7 +38,7 @@
}
@Override
- public <T> Subscriber<T> to(String destination, Class<T> type) {
+ public <T> CloseableSubscriber<T> to(String destination, Class<T> type) {
return new MailDestination<T>(destination, type);
}
diff --git a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java
index ac5d00c..4f04bac 100644
--- a/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java
+++ b/rcomp-mail/src/main/java/org/apache/karaf/rcomp/mail/MailDestination.java
@@ -22,10 +22,10 @@
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
-import org.reactivestreams.Subscriber;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.reactivestreams.Subscription;
-public class MailDestination<T> implements Subscriber<T> {
+public class MailDestination<T> implements CloseableSubscriber<T> {
private String destination;
private Subscription subscription;
@@ -67,4 +67,9 @@
public void onComplete() {
}
+ @Override
+ public void close() {
+ subscription.cancel();
+ }
+
}
diff --git a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java
index 6fd5f22..9824967 100644
--- a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttComponent.java
@@ -19,9 +19,11 @@
import java.util.HashSet;
import java.util.Set;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.apache.karaf.rcomp.api.ProvComp;
import org.apache.karaf.rcomp.api.RComponent;
import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.osgi.service.component.annotations.Activate;
@@ -29,11 +31,13 @@
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ProvComp(name="mqtt")
@Component(property="name=mqtt")
public class MqttComponent implements RComponent {
+ private static Logger LOGGER = LoggerFactory.getLogger(MqttComponent.class);
MqttClient client;
private Set<MqttDestination<?>> destinations = new HashSet<>();
@@ -42,17 +46,26 @@
@interface MqttConfig {
String serverUrl() default "tcp://localhost:1883";
String clientId();
+ String userName();
+ String password();
}
@Activate
public void activate(MqttConfig config) throws MqttException {
client = new MqttClient(config.serverUrl(), MqttClient.generateClientId(),
new MemoryPersistence());
- client.connect();
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setAutomaticReconnect(true);
+ options.setUserName(config.userName());
+ if (config.password() != null) {
+ options.setPassword(config.password().toCharArray());
+ }
+ client.connect(options);
}
@Deactivate
public void deactivate() throws MqttException {
+ LOGGER.info("Shutting down mqtt component with " + destinations.size() + " desitnations");
for (MqttDestination<?> destination : destinations) {
try {
destination.close();
@@ -65,11 +78,13 @@
@Override
public <T> Publisher<T> from(String topic, Class<T> type) {
+ LOGGER.info("Creating mqtt Publisher on topic " + topic);
return new MqttSource<T>(client, topic, type);
}
@Override
- public <T> Subscriber<T> to(String topic, Class<T> type) {
+ public <T> CloseableSubscriber<T> to(String topic, Class<T> type) {
+ LOGGER.info("Creating mqtt Subscriber on topic " + topic);
MqttDestination<T> destination = new MqttDestination<T>(client, topic, type);
destinations.add(destination);
return destination;
diff --git a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java
index 44d03f6..79dc1c1 100644
--- a/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/org/apache/karaf/rcomp/mqtt/MqttDestination.java
@@ -16,12 +16,12 @@
*/
package org.apache.karaf.rcomp.mqtt;
+import org.apache.karaf.rcomp.api.CloseableSubscriber;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MqttDestination<T> implements Subscriber<T>, AutoCloseable {
+public class MqttDestination<T> implements CloseableSubscriber<T> {
private MqttClient client;
private String topic;
@@ -68,7 +68,7 @@
}
@Override
- public void close() throws Exception {
+ public void close() {
if (subscription != null) {
subscription.cancel();
subscription = null;