Switch API to use Subscriber
diff --git a/reactortest/src/main/java/component/api/MComponent.java b/reactortest/src/main/java/component/api/MComponent.java
index 1a9943b..7f7027c 100644
--- a/reactortest/src/main/java/component/api/MComponent.java
+++ b/reactortest/src/main/java/component/api/MComponent.java
@@ -1,11 +1,11 @@
 package component.api;
 
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 
 public interface MComponent<F> {
     <T> Publisher<T> from(String destination, Function<F, T> converter) throws Exception;
-    <T> Consumer<T> to(String destination, Function<T, F> converter) throws Exception;
+    <T> Subscriber<T> to(String destination, Function<T, F> converter) throws Exception;
 }
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java b/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
index 6bb1b40..c2ce58f 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
+++ b/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
@@ -1,7 +1,6 @@
 package component.eventadmin;
 
 import java.util.Map;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -12,6 +11,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.EventAdmin;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 
 import component.api.MComponent;
 
@@ -39,7 +39,7 @@
     }  
     
     @Override
-    public <T> Consumer<T> to(String topic, Function<T, Map<String, ?>> converter) throws Exception {
+    public <T> Subscriber<T> to(String topic, Function<T, Map<String, ?>> converter) throws Exception {
         return new EventAdminDestination<T>(client, topic, converter);
     }
 
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java b/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
index 6144056..a1a8b96 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
+++ b/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
@@ -1,17 +1,19 @@
 package component.eventadmin;
 
 import java.util.Map;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
 
-public class EventAdminDestination<T> implements Consumer<T> {
+public class EventAdminDestination<T> implements Subscriber<T> {
     
     private EventAdmin client;
     private String topic;
     private Function<T, Map<String, ?>> converter;
+    private Subscription subscription;
 
     public EventAdminDestination(EventAdmin client, String topic, Function<T, Map<String, ?>> converter) {
         this.client = client;
@@ -20,14 +22,29 @@
     }
 
     @Override
-    public void accept(T payload) {
+    public void onSubscribe(Subscription s) {
+        this.subscription = s;
+        s.request(1);
+    }
+
+    @Override
+    public void onNext(T payload) {
         try {
             Event event = new Event(topic, converter.apply(payload));
             this.client.sendEvent(event);
         } catch (Exception e) {
             throw new RuntimeException(e);
+        } finally {
+            subscription.request(1);
         }
     }
 
+    @Override
+    public void onError(Throwable t) {
+    }
+
+    @Override
+    public void onComplete() {
+    }
 
 }
diff --git a/reactortest/src/main/java/component/mail/MailComponent.java b/reactortest/src/main/java/component/mail/MailComponent.java
index 962b0b8..a932271 100644
--- a/reactortest/src/main/java/component/mail/MailComponent.java
+++ b/reactortest/src/main/java/component/mail/MailComponent.java
@@ -1,6 +1,5 @@
 package component.mail;
 
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import javax.mail.MethodNotSupportedException;
@@ -10,6 +9,7 @@
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 
 import component.api.MComponent;
 
@@ -25,7 +25,7 @@
     }  
     
     @Override
-    public <T> Consumer<T> to(String destination, Function<T, MimeMessage> converter) throws Exception {
+    public <T> Subscriber<T> to(String destination, Function<T, MimeMessage> converter) throws Exception {
         return new MailDestination<T>(destination, converter);
     }
 
diff --git a/reactortest/src/main/java/component/mail/MailDestination.java b/reactortest/src/main/java/component/mail/MailDestination.java
index 74cde05..33dd1da 100644
--- a/reactortest/src/main/java/component/mail/MailDestination.java
+++ b/reactortest/src/main/java/component/mail/MailDestination.java
@@ -1,6 +1,5 @@
 package component.mail;
 
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import javax.mail.Address;
@@ -8,10 +7,14 @@
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeMessage;
 
-public class MailDestination<T> implements Consumer<T> {
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public class MailDestination<T> implements Subscriber<T> {
 
     private String destination;
     private Function<T, MimeMessage> converter;
+    private Subscription subscription;
 
     public MailDestination(String destination, Function<T, MimeMessage> converter) {
         this.destination = destination;
@@ -19,14 +22,30 @@
     }
 
     @Override
-    public void accept(T payload) {
+    public void onSubscribe(Subscription s) {
+        this.subscription = s;
+        s.request(1);
+    }
+
+    @Override
+    public void onNext(T payload) {
         try {
             MimeMessage message = converter.apply(payload);
             Address[] addresses = new Address[]{new InternetAddress(destination)};
             Transport.send(message, addresses);
         } catch (Exception e) {
             throw new RuntimeException(e);
+        } finally {
+            subscription.request(1);
         }
     }
 
+    @Override
+    public void onError(Throwable t) {
+    }
+
+    @Override
+    public void onComplete() {
+    }
+
 }
diff --git a/reactortest/src/main/java/component/mqtt/MqttComponent.java b/reactortest/src/main/java/component/mqtt/MqttComponent.java
index c5c6d08..a4c3b46 100644
--- a/reactortest/src/main/java/component/mqtt/MqttComponent.java
+++ b/reactortest/src/main/java/component/mqtt/MqttComponent.java
@@ -1,6 +1,5 @@
 package component.mqtt;
 
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -10,6 +9,7 @@
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 
 import component.api.MComponent;
 
@@ -42,7 +42,7 @@
     }  
     
     @Override
-    public <T> Consumer<T> to(String topic, Function<T, byte[]> converter) throws Exception {
+    public <T> Subscriber<T> to(String topic, Function<T, byte[]> converter) throws Exception {
         return new MqttDestination<T>(client, topic, converter);
     }
 
diff --git a/reactortest/src/main/java/component/mqtt/MqttDestination.java b/reactortest/src/main/java/component/mqtt/MqttDestination.java
index dcca169..d5a874d 100644
--- a/reactortest/src/main/java/component/mqtt/MqttDestination.java
+++ b/reactortest/src/main/java/component/mqtt/MqttDestination.java
@@ -1,16 +1,18 @@
 package component.mqtt;
 
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
 
-public class MqttDestination<T> implements Consumer<T> {
+public class MqttDestination<T> implements Subscriber<T> {
     
     private MqttClient client;
     private String topic;
     private Function<T, byte[]> converter;
+    private Subscription subscription;
 
     public MqttDestination(MqttClient client, String topic, Function<T, byte[]> converter) {
         this.client = client;
@@ -19,14 +21,31 @@
     }
 
     @Override
-    public void accept(T payload) {
+    public void onSubscribe(Subscription s) {
+        this.subscription = s;
+        s.request(1);
+    }
+
+    @Override
+    public void onNext(T payload) {
         try {
             MqttMessage message = new MqttMessage(converter.apply(payload));
             this.client.publish(topic, message);
         } catch (Exception e) {
             throw new RuntimeException(e);
+        } finally {
+            subscription.request(1);
         }
     }
 
+    @Override
+    public void onError(Throwable t) {
+        System.out.println("onerr");
+    }
+
+    @Override
+    public void onComplete() {
+        System.out.println("oncomplete");
+    }
 
 }
diff --git a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java b/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
index 3291758..47ff7b7 100644
--- a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
+++ b/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
@@ -1,12 +1,12 @@
 package reactortest;
 
 import java.util.Map;
-import java.util.function.Consumer;
 
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,7 +23,7 @@
     @Activate
     public void start() throws Exception {
         Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input", Map2Map::convert);
-        Consumer<Map<String, ?>> toTopic = eventAdmin.to("output", Map2Map::convert);
+        Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output", Map2Map::convert);
         Flux.from(fromTopic)
             .log()
             .subscribe(toTopic);
diff --git a/reactortest/src/main/java/reactortest/MqttExampleComponent.java b/reactortest/src/main/java/reactortest/MqttExampleComponent.java
index a90e3fc..8d079bd 100644
--- a/reactortest/src/main/java/reactortest/MqttExampleComponent.java
+++ b/reactortest/src/main/java/reactortest/MqttExampleComponent.java
@@ -1,11 +1,10 @@
 package reactortest;
 
-import java.util.function.Consumer;
-
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,7 +23,7 @@
     public void start() throws Exception {
         LOG.info("Starting mqtt test component");
         Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
-        Consumer<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
+        Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
         Flux.from(fromTopic)
             .log()
             .window(2, 1)
diff --git a/reactortest/src/test/java/component/mail/TestMail.java b/reactortest/src/test/java/component/mail/TestMail.java
index 16d9ea9..a37fec0 100644
--- a/reactortest/src/test/java/component/mail/TestMail.java
+++ b/reactortest/src/test/java/component/mail/TestMail.java
@@ -2,7 +2,6 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.function.Consumer;
 
 import javax.mail.MessagingException;
 import javax.mail.Session;
@@ -10,9 +9,8 @@
 
 import org.junit.Ignore;
 import org.junit.Test;
+import org.reactivestreams.Subscriber;
 
-import component.mail.MailComponent;
-import component.mail.SessionComponent;
 import reactor.core.publisher.Flux;
 
 public class TestMail {
@@ -27,7 +25,7 @@
         MailComponent mail = new MailComponent();
         mail.session = session;
 
-        Consumer<String> to = mail.to("cschneider@localhost", txt -> createMessage(session, txt));
+        Subscriber<String> to = mail.to("cschneider@localhost", txt -> createMessage(session, txt));
         Flux.just("Test").subscribe(to);
     }
 
diff --git a/reactortest/src/test/java/component/mqtt/Test1.java b/reactortest/src/test/java/component/mqtt/Test1.java
index f6a0dfa..2c15d1f 100644
--- a/reactortest/src/test/java/component/mqtt/Test1.java
+++ b/reactortest/src/test/java/component/mqtt/Test1.java
@@ -3,7 +3,8 @@
 import static java.time.Duration.of;
 
 import java.time.temporal.ChronoUnit;
-import java.util.function.Consumer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -11,6 +12,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
 
 import reactor.core.publisher.Flux;
 import reactor.math.MathFlux;
@@ -41,12 +43,13 @@
 
     @Test
     public void testMQtt() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
         MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId());
         client.connect();
         MqttComponent mqtt = new MqttComponent();
         mqtt.client = client;
         Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
-        Consumer<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
+        Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
         Flux.from(fromTopic)
             .log()
             .window(2, 1)
@@ -56,10 +59,11 @@
         
         client.subscribe("output", (topic, message) -> {
             result = ByteArrayConverter.asDouble(message.getPayload());
+            latch.countDown();
         });
         client.publish("input", new MqttMessage(ByteArrayConverter.fromInteger(2)));
         client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
-        Thread.sleep(100);
+        latch.await(10, TimeUnit.SECONDS);
         Assert.assertEquals(2, result, 0.1);
         client.disconnect();
         client.close();