Allow to specify a type to deliver / consume in the API
diff --git a/pom.xml b/pom.xml
index b6359d4..901ef1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,6 +7,9 @@
     <packaging>pom</packaging>
 
     <properties>
+    	<reactor.version>3.0.7.RELEASE</reactor.version>
+    	<jetty.version>8.1.15.v20140411</jetty.version>
+    	<cxf.version>3.2.0-SNAPSHOT</cxf.version>
     </properties>
 
     <modules>
diff --git a/rcomp-api/src/main/java/component/api/MComponent.java b/rcomp-api/src/main/java/component/api/MComponent.java
index 67d0ea8..2573aeb 100644
--- a/rcomp-api/src/main/java/component/api/MComponent.java
+++ b/rcomp-api/src/main/java/component/api/MComponent.java
@@ -3,7 +3,7 @@
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
-public interface MComponent<T> {
-    Publisher<T> from(String destination);
-    Subscriber<T> to(String destination);
+public interface MComponent {
+    <S> Publisher<S> from(String destination, Class<? extends S> type);
+    <S> Subscriber<S> to(String destination, Class<? extends S> type);
 }
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
index 5b264a0..f3506f7 100644
--- a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
@@ -1,7 +1,5 @@
 package component.eventadmin;
 
-import java.util.Map;
-
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -14,7 +12,7 @@
 import component.api.MComponent;
 
 @Component(property="name=eventAdmin")
-public class EventAdminComponent implements MComponent<Map<String, ?>> {
+public class EventAdminComponent implements MComponent {
     
     private BundleContext context;
 
@@ -32,13 +30,13 @@
     }
 
     @Override
-    public Publisher<Map<String, ?>> from(String topic) {
-        return new EventAdminSource(context, topic);
+    public <T> Publisher<T> from(String topic, Class<? extends T> type) {
+        return new EventAdminSource<T>(context, topic, type);
     }  
     
     @Override
-    public Subscriber<Map<String, ?>> to(String topic) {
-        return new EventAdminDestination(client, topic);
+    public <T> Subscriber<T> to(String topic, Class<? extends T> type) {
+        return new EventAdminDestination<T>(client, topic, type);
     }
 
 }
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
index 0e7f667..9c58201 100644
--- a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
@@ -7,15 +7,18 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class EventAdminDestination implements Subscriber<Map<String, ?>> {
+public class EventAdminDestination<T> implements Subscriber<T> {
     
     private EventAdmin client;
     private String topic;
     private Subscription subscription;
 
-    public EventAdminDestination(EventAdmin client, String topic) {
+    public EventAdminDestination(EventAdmin client, String topic, Class<? extends T> type) {
         this.client = client;
         this.topic = topic;
+        if (! type.equals(Map.class)) {
+            throw new IllegalArgumentException("Curently only Map<String, ?> is supported");
+        }
     }
 
     @Override
@@ -25,9 +28,9 @@
     }
 
     @Override
-    public void onNext(Map<String, ?> payload) {
+    public void onNext(T payload) {
         try {
-            Event event = new Event(topic, payload);
+            Event event = new Event(topic, convertTo(payload));
             this.client.sendEvent(event);
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -36,6 +39,11 @@
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private Map<String, ?> convertTo(T payload) {
+        return (Map<String, ?>) payload;
+    }
+
     @Override
     public void onError(Throwable t) {
     }
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
index bbb560b..0354f75 100644
--- a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
@@ -15,26 +15,29 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class EventAdminSource implements Publisher<Map<String, ?>> {
+public class EventAdminSource<T> implements Publisher<T> {
     private BundleContext context;
     private String topic;
     
-    public EventAdminSource(BundleContext context, String topic) {
+    public EventAdminSource(BundleContext context, String topic, Class<? extends T> type) {
         this.context = context;
         this.topic = topic;
+        if (! type.equals(Map.class)) {
+            throw new IllegalArgumentException("Curently only Map<String, ?> is supported");
+        }
     }
     
     @Override
-    public void subscribe(Subscriber<? super Map<String, ?>> subscriber) {
+    public void subscribe(Subscriber<? super T> subscriber) {
         subscriber.onSubscribe(new EventAdminSubscription(subscriber));
     }
 
     public class EventAdminSubscription implements Subscription, EventHandler {
         private AtomicBoolean subScribed;
         private ServiceRegistration<EventHandler> sreg;
-        private Subscriber<? super Map<String, ?>> subscriber;
+        private Subscriber<? super T> subscriber;
         
-        public EventAdminSubscription(Subscriber<? super Map<String, ?>> subscriber) {
+        public EventAdminSubscription(Subscriber<? super T> subscriber) {
             this.subscriber = subscriber;
             this.subScribed = new AtomicBoolean(false);
         }
@@ -61,7 +64,12 @@
         
         @Override
         public void handleEvent(Event event) {
-            this.subscriber.onNext(toMap(event));
+            this.subscriber.onNext(convertTo(toMap(event)));
+        }
+
+        @SuppressWarnings("unchecked")
+        private T convertTo(Map<String, ?> map) {
+            return (T) map;
         }
 
         Map<String, ?> toMap(Event event) {
diff --git a/rcomp-examples/pom.xml b/rcomp-examples/pom.xml
index f726a2c..a45daa5 100644
--- a/rcomp-examples/pom.xml
+++ b/rcomp-examples/pom.xml
@@ -24,28 +24,30 @@
         <dependency>
             <groupId>io.projectreactor.addons</groupId>
             <artifactId>reactor-extra</artifactId>
-            <version>3.0.7.RELEASE</version>
+            <version>${reactor.version}</version>
         </dependency>
         <dependency>
             <groupId>io.projectreactor.addons</groupId>
             <artifactId>reactor-adapter</artifactId>
-            <version>3.0.7.RELEASE</version>
+            <version>${reactor.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-client</artifactId>
-            <version>3.2.0-SNAPSHOT</version>
+            <version>${cxf.version}</version>
         </dependency>
         
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-server</artifactId>
-            <version>8.1.15.v20140411</version>
+            <version>${jetty.version}</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-servlet</artifactId>
-            <version>8.1.15.v20140411</version>
+            <version>${jetty.version}</version>
+            <scope>test</scope>
         </dependency>
 
     </dependencies>
diff --git a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
index f4e490d..2417f0b 100644
--- a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
+++ b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
@@ -18,12 +18,12 @@
     Logger LOG = LoggerFactory.getLogger(EventAdminExample.class);
     
     @Reference(target="(name=eventAdmin)")
-    MComponent<Map<String, ? >> eventAdmin;
+    MComponent eventAdmin;
 
     @Activate
     public void start() throws Exception {
-        Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input");
-        Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output");
+        Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input", Map.class);
+        Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output", Map.class);
         Flux.from(fromTopic)
             .log()
             .subscribe(toTopic);
diff --git a/rcomp-examples/src/main/java/reactortest/MqttExample.java b/rcomp-examples/src/main/java/reactortest/MqttExample.java
index feb214e..1c37247 100644
--- a/rcomp-examples/src/main/java/reactortest/MqttExample.java
+++ b/rcomp-examples/src/main/java/reactortest/MqttExample.java
@@ -17,13 +17,13 @@
     Logger LOG = LoggerFactory.getLogger(MqttExample.class);
     
     @Reference(target="(name=mqtt)")
-    MComponent<byte[]> mqtt;
+    MComponent mqtt;
 
     @Activate
     public void start() throws Exception {
         LOG.info("Starting mqtt test component");
-        Publisher<byte[]> fromTopic = mqtt.from("input");
-        Subscriber<byte[]> toTopic = mqtt.to("output");
+        Publisher<byte[]> fromTopic = mqtt.from("input", byte[].class);
+        Subscriber<byte[]> toTopic = mqtt.to("output", byte[].class);
         Flux.from(fromTopic)
             .map(ByteArrayConverter::asInteger)
             .log()
diff --git a/rcomp-examples/src/test/java/examples/TestRs.java b/rcomp-examples/src/test/java/examples/TestRs.java
index 96aa543..feef726 100644
--- a/rcomp-examples/src/test/java/examples/TestRs.java
+++ b/rcomp-examples/src/test/java/examples/TestRs.java
@@ -52,6 +52,7 @@
 
         ServletHandler handler = new ServletHandler();
         Servlet servlet = new HttpServlet() {
+            private static final long serialVersionUID = 1L;
 
             @Override
             protected void doGet(HttpServletRequest req, HttpServletResponse resp)
diff --git a/rcomp-examples/src/test/java/examples/TestServlet.java b/rcomp-examples/src/test/java/examples/TestServlet.java
new file mode 100644
index 0000000..b945985
--- /dev/null
+++ b/rcomp-examples/src/test/java/examples/TestServlet.java
@@ -0,0 +1,72 @@
+package examples;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import reactor.core.publisher.Mono;
+
+public class TestServlet {
+
+    @Test
+    public void testServlet() throws Exception {
+        Server server = createServer();
+        server.start();
+        WebTarget client = ClientBuilder.newClient().target("http://localhost:8384/Hello");
+        String result = client.request().get(String.class);
+        Assert.assertEquals("/Hello", result);
+        server.stop();
+    }
+    
+    private final class TestAsyncServlet extends HttpServlet {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+            final AsyncContext ctx = req.startAsync(req, resp);
+            Mono.just(req.getServletPath())
+               .delayElement(Duration.ofMillis(2000))
+               .doOnSuccess(result -> writeResult(ctx, result))
+               .log()
+               .subscribe();
+            System.out.println("Returning from servlet. Request is handled asynchronously");
+        }
+
+        private void writeResult(AsyncContext ctx, String result) {
+            try {
+                ctx.getResponse().getWriter().append(result);
+                ctx.complete();
+            } catch (IOException e) {
+
+            }
+        }
+    }
+    
+    
+    private Server createServer() {
+        Server server = new Server();
+        SelectChannelConnector connector = new SelectChannelConnector();
+        connector.setPort(8384);
+        server.addConnector(connector);
+
+        ServletContextHandler handler = new ServletContextHandler();
+        handler.setContextPath("/");
+        handler.addServlet(new ServletHolder(new TestAsyncServlet()), "/");
+        server.setHandler(handler);
+        return server;
+    }
+}
diff --git a/rcomp-mail/src/main/java/component/mail/MailComponent.java b/rcomp-mail/src/main/java/component/mail/MailComponent.java
index 81986c2..62ea322 100644
--- a/rcomp-mail/src/main/java/component/mail/MailComponent.java
+++ b/rcomp-mail/src/main/java/component/mail/MailComponent.java
@@ -1,7 +1,6 @@
 package component.mail;
 
 import javax.mail.Session;
-import javax.mail.internet.MimeMessage;
 
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
@@ -11,19 +10,19 @@
 import component.api.MComponent;
 
 @Component(property="name=mail")
-public class MailComponent implements MComponent<MimeMessage> {
+public class MailComponent implements MComponent {
     
     @Reference
     Session session;
 
     @Override
-    public Publisher<MimeMessage> from(String topic) {
+    public <T> Publisher<T> from(String topic, Class<? extends T> type) {
         throw new RuntimeException();
     }  
     
     @Override
-    public Subscriber<MimeMessage> to(String destination) {
-        return new MailDestination(destination);
+    public <T> Subscriber<T> to(String destination, Class<? extends T> type) {
+        return new MailDestination<T>(destination, type);
     }
 
 }
diff --git a/rcomp-mail/src/main/java/component/mail/MailDestination.java b/rcomp-mail/src/main/java/component/mail/MailDestination.java
index d9ebf27..15d38c6 100644
--- a/rcomp-mail/src/main/java/component/mail/MailDestination.java
+++ b/rcomp-mail/src/main/java/component/mail/MailDestination.java
@@ -1,6 +1,7 @@
 package component.mail;
 
 import javax.mail.Address;
+import javax.mail.Message;
 import javax.mail.Transport;
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeMessage;
@@ -8,13 +9,16 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MailDestination implements Subscriber<MimeMessage> {
+public class MailDestination<T> implements Subscriber<T> {
 
     private String destination;
     private Subscription subscription;
 
-    public MailDestination(String destination) {
+    public MailDestination(String destination, Class<? extends T> type) {
         this.destination = destination;
+        if (! type.equals(MimeMessage.class)) {
+            throw new IllegalArgumentException("Curently only MimeMessage is supported");
+        }
     }
 
     @Override
@@ -24,10 +28,10 @@
     }
 
     @Override
-    public void onNext(MimeMessage message) {
+    public void onNext(T message) {
         try {
             Address[] addresses = new Address[]{new InternetAddress(destination)};
-            Transport.send(message, addresses);
+            Transport.send(convertTo(message), addresses);
         } catch (Exception e) {
             throw new RuntimeException(e);
         } finally {
@@ -35,6 +39,10 @@
         }
     }
 
+    private Message convertTo(T message) {
+        return (Message) message;
+    }
+
     @Override
     public void onError(Throwable t) {
     }
diff --git a/rcomp-mail/src/test/java/component/mail/TestMail.java b/rcomp-mail/src/test/java/component/mail/TestMail.java
index d27e4c0..dbc6a99 100644
--- a/rcomp-mail/src/test/java/component/mail/TestMail.java
+++ b/rcomp-mail/src/test/java/component/mail/TestMail.java
@@ -25,7 +25,7 @@
         MailComponent mail = new MailComponent();
         mail.session = session;
 
-        Subscriber<MimeMessage> to = mail.to("cschneider@localhost");
+        Subscriber<MimeMessage> to = mail.to("cschneider@localhost", MimeMessage.class);
         Flux.just("Test").map(txt -> createMessage(session, txt)).subscribe(to);
     }
 
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
index f35f492..352475f 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
@@ -12,7 +12,7 @@
 import component.api.MComponent;
 
 @Component(property="name=mqtt")
-public class MqttComponent implements MComponent<byte[]> {
+public class MqttComponent implements MComponent {
     
     MqttClient client;
 
@@ -35,13 +35,13 @@
     }
 
     @Override
-    public Publisher<byte[]> from(String topic) {
-        return new MqttSource(client, topic);
+    public <T> Publisher<T> from(String topic, Class<? extends T> type) {
+        return new MqttSource<T>(client, topic, type);
     }  
     
     @Override
-    public Subscriber<byte[]> to(String topic) {
-        return new MqttDestination(client, topic);
+    public <T> Subscriber<T> to(String topic, Class<? extends T> type) {
+        return new MqttDestination<T>(client, topic, type);
     }
 
 }
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
index daa0c57..52236cd 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
@@ -5,15 +5,18 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MqttDestination implements Subscriber<byte[]> {
+public class MqttDestination<T> implements Subscriber<T> {
     
     private MqttClient client;
     private String topic;
     private Subscription subscription;
 
-    public MqttDestination(MqttClient client, String topic) {
+    public MqttDestination(MqttClient client, String topic, Class<? extends T> type) {
         this.client = client;
         this.topic = topic;
+        if (! type.equals(byte[].class)) {
+            throw new IllegalArgumentException("Curently only byte[] is supported");
+        }
     }
 
     @Override
@@ -23,9 +26,9 @@
     }
 
     @Override
-    public void onNext(byte[] payload) {
+    public void onNext(T payload) {
         try {
-            MqttMessage message = new MqttMessage(payload);
+            MqttMessage message = new MqttMessage(convertTo(payload));
             this.client.publish(topic, message);
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -34,6 +37,10 @@
         }
     }
 
+    private byte[] convertTo(T payload) {
+        return (byte[])payload;
+    }
+
     @Override
     public void onError(Throwable t) {
         System.out.println("onerr");
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
index 5b81963..1ac3e74 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
@@ -10,25 +10,28 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MqttSource implements Publisher<byte[]> {
+public class MqttSource<T> implements Publisher<T> {
     private MqttClient client;
     private String topic;
     
-    public MqttSource(MqttClient client, String topic) {
+    public MqttSource(MqttClient client, String topic, Class<? extends T> type) {
         this.client = client;
         this.topic = topic;
+        if (! type.equals(byte[].class)) {
+            throw new IllegalArgumentException("Curently only byte[] is supported");
+        }
     }
     
     @Override
-    public void subscribe(Subscriber<? super byte[]> subscriber) {
+    public void subscribe(Subscriber<? super T> subscriber) {
         subscriber.onSubscribe(new MqttSubscription(subscriber));
     }
 
     public class MqttSubscription implements IMqttMessageListener, Subscription {
         private AtomicBoolean subScribed;
-        private Subscriber<? super byte[]> subscriber;
+        private Subscriber<? super T> subscriber;
         
-        public MqttSubscription(Subscriber<? super byte[]> subscriber) {
+        public MqttSubscription(Subscriber<? super T> subscriber) {
             this.subscriber = subscriber;
             this.subScribed = new AtomicBoolean(false);
         }
@@ -57,7 +60,17 @@
         
         @Override
         public void messageArrived(String topic, MqttMessage message) throws Exception {
-            this.subscriber.onNext(message.getPayload());
+            this.subscriber.onNext(converTo(message.getPayload()));
+        }
+
+        /**
+         * Simple conversion that just supports byte[]
+         * @param payload
+         * @return
+         */
+        @SuppressWarnings("unchecked")
+        private T converTo(byte[] payload) {
+            return (T) payload; 
         }
 
     }
diff --git a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
index 32a1fe0..2674c3f 100644
--- a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
+++ b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
@@ -23,8 +23,8 @@
         client.connect();
         MqttComponent mqtt = new MqttComponent();
         mqtt.client = client;
-        Publisher<byte[]> fromTopic = mqtt.from("input");
-        Subscriber<byte[]> toTopic = mqtt.to("output");
+        Publisher<byte[]> fromTopic = mqtt.from("input", byte[].class);
+        Subscriber<byte[]> toTopic = mqtt.to("output", byte[].class);
         Flux.from(fromTopic)
             .log()
             .subscribe(toTopic);