Switch API to use Subscriber
diff --git a/reactortest/src/main/java/component/api/MComponent.java b/reactortest/src/main/java/component/api/MComponent.java
index 1a9943b..7f7027c 100644
--- a/reactortest/src/main/java/component/api/MComponent.java
+++ b/reactortest/src/main/java/component/api/MComponent.java
@@ -1,11 +1,11 @@
package component.api;
-import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
public interface MComponent<F> {
<T> Publisher<T> from(String destination, Function<F, T> converter) throws Exception;
- <T> Consumer<T> to(String destination, Function<T, F> converter) throws Exception;
+ <T> Subscriber<T> to(String destination, Function<T, F> converter) throws Exception;
}
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java b/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
index 6bb1b40..c2ce58f 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
+++ b/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
@@ -1,7 +1,6 @@
package component.eventadmin;
import java.util.Map;
-import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -12,6 +11,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import component.api.MComponent;
@@ -39,7 +39,7 @@
}
@Override
- public <T> Consumer<T> to(String topic, Function<T, Map<String, ?>> converter) throws Exception {
+ public <T> Subscriber<T> to(String topic, Function<T, Map<String, ?>> converter) throws Exception {
return new EventAdminDestination<T>(client, topic, converter);
}
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java b/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
index 6144056..a1a8b96 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
+++ b/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
@@ -1,17 +1,19 @@
package component.eventadmin;
import java.util.Map;
-import java.util.function.Consumer;
import java.util.function.Function;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
-public class EventAdminDestination<T> implements Consumer<T> {
+public class EventAdminDestination<T> implements Subscriber<T> {
private EventAdmin client;
private String topic;
private Function<T, Map<String, ?>> converter;
+ private Subscription subscription;
public EventAdminDestination(EventAdmin client, String topic, Function<T, Map<String, ?>> converter) {
this.client = client;
@@ -20,14 +22,29 @@
}
@Override
- public void accept(T payload) {
+ public void onSubscribe(Subscription s) {
+ this.subscription = s;
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(T payload) {
try {
Event event = new Event(topic, converter.apply(payload));
this.client.sendEvent(event);
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ subscription.request(1);
}
}
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
}
diff --git a/reactortest/src/main/java/component/mail/MailComponent.java b/reactortest/src/main/java/component/mail/MailComponent.java
index 962b0b8..a932271 100644
--- a/reactortest/src/main/java/component/mail/MailComponent.java
+++ b/reactortest/src/main/java/component/mail/MailComponent.java
@@ -1,6 +1,5 @@
package component.mail;
-import java.util.function.Consumer;
import java.util.function.Function;
import javax.mail.MethodNotSupportedException;
@@ -10,6 +9,7 @@
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import component.api.MComponent;
@@ -25,7 +25,7 @@
}
@Override
- public <T> Consumer<T> to(String destination, Function<T, MimeMessage> converter) throws Exception {
+ public <T> Subscriber<T> to(String destination, Function<T, MimeMessage> converter) throws Exception {
return new MailDestination<T>(destination, converter);
}
diff --git a/reactortest/src/main/java/component/mail/MailDestination.java b/reactortest/src/main/java/component/mail/MailDestination.java
index 74cde05..33dd1da 100644
--- a/reactortest/src/main/java/component/mail/MailDestination.java
+++ b/reactortest/src/main/java/component/mail/MailDestination.java
@@ -1,6 +1,5 @@
package component.mail;
-import java.util.function.Consumer;
import java.util.function.Function;
import javax.mail.Address;
@@ -8,10 +7,14 @@
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
-public class MailDestination<T> implements Consumer<T> {
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public class MailDestination<T> implements Subscriber<T> {
private String destination;
private Function<T, MimeMessage> converter;
+ private Subscription subscription;
public MailDestination(String destination, Function<T, MimeMessage> converter) {
this.destination = destination;
@@ -19,14 +22,30 @@
}
@Override
- public void accept(T payload) {
+ public void onSubscribe(Subscription s) {
+ this.subscription = s;
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(T payload) {
try {
MimeMessage message = converter.apply(payload);
Address[] addresses = new Address[]{new InternetAddress(destination)};
Transport.send(message, addresses);
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ subscription.request(1);
}
}
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+
}
diff --git a/reactortest/src/main/java/component/mqtt/MqttComponent.java b/reactortest/src/main/java/component/mqtt/MqttComponent.java
index c5c6d08..a4c3b46 100644
--- a/reactortest/src/main/java/component/mqtt/MqttComponent.java
+++ b/reactortest/src/main/java/component/mqtt/MqttComponent.java
@@ -1,6 +1,5 @@
package component.mqtt;
-import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -10,6 +9,7 @@
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import component.api.MComponent;
@@ -42,7 +42,7 @@
}
@Override
- public <T> Consumer<T> to(String topic, Function<T, byte[]> converter) throws Exception {
+ public <T> Subscriber<T> to(String topic, Function<T, byte[]> converter) throws Exception {
return new MqttDestination<T>(client, topic, converter);
}
diff --git a/reactortest/src/main/java/component/mqtt/MqttDestination.java b/reactortest/src/main/java/component/mqtt/MqttDestination.java
index dcca169..d5a874d 100644
--- a/reactortest/src/main/java/component/mqtt/MqttDestination.java
+++ b/reactortest/src/main/java/component/mqtt/MqttDestination.java
@@ -1,16 +1,18 @@
package component.mqtt;
-import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
-public class MqttDestination<T> implements Consumer<T> {
+public class MqttDestination<T> implements Subscriber<T> {
private MqttClient client;
private String topic;
private Function<T, byte[]> converter;
+ private Subscription subscription;
public MqttDestination(MqttClient client, String topic, Function<T, byte[]> converter) {
this.client = client;
@@ -19,14 +21,31 @@
}
@Override
- public void accept(T payload) {
+ public void onSubscribe(Subscription s) {
+ this.subscription = s;
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(T payload) {
try {
MqttMessage message = new MqttMessage(converter.apply(payload));
this.client.publish(topic, message);
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ subscription.request(1);
}
}
+ @Override
+ public void onError(Throwable t) {
+ System.out.println("onerr");
+ }
+
+ @Override
+ public void onComplete() {
+ System.out.println("oncomplete");
+ }
}
diff --git a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java b/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
index 3291758..47ff7b7 100644
--- a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
+++ b/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
@@ -1,12 +1,12 @@
package reactortest;
import java.util.Map;
-import java.util.function.Consumer;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,7 +23,7 @@
@Activate
public void start() throws Exception {
Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input", Map2Map::convert);
- Consumer<Map<String, ?>> toTopic = eventAdmin.to("output", Map2Map::convert);
+ Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output", Map2Map::convert);
Flux.from(fromTopic)
.log()
.subscribe(toTopic);
diff --git a/reactortest/src/main/java/reactortest/MqttExampleComponent.java b/reactortest/src/main/java/reactortest/MqttExampleComponent.java
index a90e3fc..8d079bd 100644
--- a/reactortest/src/main/java/reactortest/MqttExampleComponent.java
+++ b/reactortest/src/main/java/reactortest/MqttExampleComponent.java
@@ -1,11 +1,10 @@
package reactortest;
-import java.util.function.Consumer;
-
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,7 +23,7 @@
public void start() throws Exception {
LOG.info("Starting mqtt test component");
Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
- Consumer<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
+ Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
Flux.from(fromTopic)
.log()
.window(2, 1)
diff --git a/reactortest/src/test/java/component/mail/TestMail.java b/reactortest/src/test/java/component/mail/TestMail.java
index 16d9ea9..a37fec0 100644
--- a/reactortest/src/test/java/component/mail/TestMail.java
+++ b/reactortest/src/test/java/component/mail/TestMail.java
@@ -2,7 +2,6 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.function.Consumer;
import javax.mail.MessagingException;
import javax.mail.Session;
@@ -10,9 +9,8 @@
import org.junit.Ignore;
import org.junit.Test;
+import org.reactivestreams.Subscriber;
-import component.mail.MailComponent;
-import component.mail.SessionComponent;
import reactor.core.publisher.Flux;
public class TestMail {
@@ -27,7 +25,7 @@
MailComponent mail = new MailComponent();
mail.session = session;
- Consumer<String> to = mail.to("cschneider@localhost", txt -> createMessage(session, txt));
+ Subscriber<String> to = mail.to("cschneider@localhost", txt -> createMessage(session, txt));
Flux.just("Test").subscribe(to);
}
diff --git a/reactortest/src/test/java/component/mqtt/Test1.java b/reactortest/src/test/java/component/mqtt/Test1.java
index f6a0dfa..2c15d1f 100644
--- a/reactortest/src/test/java/component/mqtt/Test1.java
+++ b/reactortest/src/test/java/component/mqtt/Test1.java
@@ -3,7 +3,8 @@
import static java.time.Duration.of;
import java.time.temporal.ChronoUnit;
-import java.util.function.Consumer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -11,6 +12,7 @@
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.math.MathFlux;
@@ -41,12 +43,13 @@
@Test
public void testMQtt() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId());
client.connect();
MqttComponent mqtt = new MqttComponent();
mqtt.client = client;
Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
- Consumer<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
+ Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
Flux.from(fromTopic)
.log()
.window(2, 1)
@@ -56,10 +59,11 @@
client.subscribe("output", (topic, message) -> {
result = ByteArrayConverter.asDouble(message.getPayload());
+ latch.countDown();
});
client.publish("input", new MqttMessage(ByteArrayConverter.fromInteger(2)));
client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
- Thread.sleep(100);
+ latch.await(10, TimeUnit.SECONDS);
Assert.assertEquals(2, result, 0.1);
client.disconnect();
client.close();