Split into bundles and add test with cxf jax-rs
diff --git a/reactortest/README.md b/README.md
similarity index 76%
rename from reactortest/README.md
rename to README.md
index daa86e0..cf43bc0 100644
--- a/reactortest/README.md
+++ b/README.md
@@ -28,11 +28,16 @@
 install -s mvn:io.projectreactor/reactor-core/3.0.7.RELEASE
 install -s wrap:mvn:io.projectreactor.addons/reactor-extra/3.0.7.RELEASE
 install -s mvn:javax.mail/mail/1.5.0-b01
-install -s mvn:net.lr/reactortest/0.0.1-SNAPSHOT
+
+install -s mvn:net.lr.reactive.component/rcomp-api/1.0.0-SNAPSHOT
+install -s mvn:net.lr.reactive.component/rcomp-mqtt/1.0.0-SNAPSHOT
+install -s mvn:net.lr.reactive.component/rcomp-eventadmin/1.0.0-SNAPSHOT
+install -s mvn:net.lr.reactive.component/rcomp-examples/1.0.0-SNAPSHOT
 ```
 
 ## Test
 
+## Mqtt
 Start mqtt client
 
 Subscribe to topic "output". 
@@ -40,4 +45,11 @@
 
 You should receive the following on the topic output: "1.5", "2.5"
 
+# EventAdmin
 
+```
+event:send input a=b
+log:tail
+```
+
+The log should show that the event was received.
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..b6359d4
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,108 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>net.lr.reactive.component</groupId>
+    <artifactId>rcomp-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <properties>
+    </properties>
+
+    <modules>
+        <module>rcomp-api</module>    
+        <module>rcomp-eventadmin</module>
+        <module>rcomp-mqtt</module>
+        <module>rcomp-mail</module>
+        <module>rcomp-examples</module>
+    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.reactivestreams</groupId>
+            <artifactId>reactive-streams</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.21</version>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.core</artifactId>
+            <version>6.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.cmpn</artifactId>
+            <version>6.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-jdk14</artifactId>
+            <version>1.7.21</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>net.lr.reactive.component</groupId>
+                <artifactId>rcomp-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.projectreactor</groupId>
+                <artifactId>reactor-core</artifactId>
+                <version>3.0.7.RELEASE</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-maven-plugin</artifactId>
+                <version>3.3.0</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>bnd-process</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <archive>
+                        <manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
+                    </archive>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/rcomp-api/bnd.bnd b/rcomp-api/bnd.bnd
new file mode 100644
index 0000000..95fa3f2
--- /dev/null
+++ b/rcomp-api/bnd.bnd
@@ -0,0 +1 @@
+Export-Package: component.api
\ No newline at end of file
diff --git a/rcomp-api/pom.xml b/rcomp-api/pom.xml
new file mode 100644
index 0000000..587acbc
--- /dev/null
+++ b/rcomp-api/pom.xml
@@ -0,0 +1,9 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>net.lr.reactive.component</groupId>
+    <artifactId>rcomp-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>rcomp-api</artifactId>
+</project>
\ No newline at end of file
diff --git a/rcomp-api/src/main/java/component/api/MComponent.java b/rcomp-api/src/main/java/component/api/MComponent.java
new file mode 100644
index 0000000..67d0ea8
--- /dev/null
+++ b/rcomp-api/src/main/java/component/api/MComponent.java
@@ -0,0 +1,9 @@
+package component.api;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+public interface MComponent<T> {
+    Publisher<T> from(String destination);
+    Subscriber<T> to(String destination);
+}
diff --git a/rcomp-eventadmin/pom.xml b/rcomp-eventadmin/pom.xml
new file mode 100644
index 0000000..1ec74dc
--- /dev/null
+++ b/rcomp-eventadmin/pom.xml
@@ -0,0 +1,17 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>net.lr.reactive.component</groupId>
+        <artifactId>rcomp-parent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rcomp-eventadmin</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>net.lr.reactive.component</groupId>
+            <artifactId>rcomp-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
new file mode 100644
index 0000000..5b264a0
--- /dev/null
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
@@ -0,0 +1,44 @@
+package component.eventadmin;
+
+import java.util.Map;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+import component.api.MComponent;
+
+@Component(property="name=eventAdmin")
+public class EventAdminComponent implements MComponent<Map<String, ?>> {
+    
+    private BundleContext context;
+
+    @Reference
+    EventAdmin client;
+    
+
+    @Activate
+    public void activate(BundleContext context) {
+        this.context = context;
+    }
+    
+    @Deactivate
+    public void deactivate() {
+    }
+
+    @Override
+    public Publisher<Map<String, ?>> from(String topic) {
+        return new EventAdminSource(context, topic);
+    }  
+    
+    @Override
+    public Subscriber<Map<String, ?>> to(String topic) {
+        return new EventAdminDestination(client, topic);
+    }
+
+}
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
similarity index 72%
rename from reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
rename to rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
index a1a8b96..0e7f667 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
@@ -1,24 +1,21 @@
 package component.eventadmin;
 
 import java.util.Map;
-import java.util.function.Function;
 
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class EventAdminDestination<T> implements Subscriber<T> {
+public class EventAdminDestination implements Subscriber<Map<String, ?>> {
     
     private EventAdmin client;
     private String topic;
-    private Function<T, Map<String, ?>> converter;
     private Subscription subscription;
 
-    public EventAdminDestination(EventAdmin client, String topic, Function<T, Map<String, ?>> converter) {
+    public EventAdminDestination(EventAdmin client, String topic) {
         this.client = client;
         this.topic = topic;
-        this.converter = converter;
     }
 
     @Override
@@ -28,9 +25,9 @@
     }
 
     @Override
-    public void onNext(T payload) {
+    public void onNext(Map<String, ?> payload) {
         try {
-            Event event = new Event(topic, converter.apply(payload));
+            Event event = new Event(topic, payload);
             this.client.sendEvent(event);
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminSource.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
similarity index 73%
rename from reactortest/src/main/java/component/eventadmin/EventAdminSource.java
rename to rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
index 80bc7b7..bbb560b 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminSource.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
@@ -5,7 +5,6 @@
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -16,28 +15,26 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class EventAdminSource <T> implements Publisher<T> {
+public class EventAdminSource implements Publisher<Map<String, ?>> {
     private BundleContext context;
-    private Function<Map<String, ?>, T> converter;
     private String topic;
     
-    public EventAdminSource(BundleContext context, String topic, Function<Map<String, ?>, T> converter) {
+    public EventAdminSource(BundleContext context, String topic) {
         this.context = context;
         this.topic = topic;
-        this.converter = converter;
     }
     
     @Override
-    public void subscribe(Subscriber<? super T> subscriber) {
-        subscriber.onSubscribe(new MqttSubscription(subscriber));
+    public void subscribe(Subscriber<? super Map<String, ?>> subscriber) {
+        subscriber.onSubscribe(new EventAdminSubscription(subscriber));
     }
 
-    public class MqttSubscription implements Subscription, EventHandler {
+    public class EventAdminSubscription implements Subscription, EventHandler {
         private AtomicBoolean subScribed;
         private ServiceRegistration<EventHandler> sreg;
-        private Subscriber<? super T> subscriber;
+        private Subscriber<? super Map<String, ?>> subscriber;
         
-        public MqttSubscription(Subscriber<? super T> subscriber) {
+        public EventAdminSubscription(Subscriber<? super Map<String, ?>> subscriber) {
             this.subscriber = subscriber;
             this.subScribed = new AtomicBoolean(false);
         }
@@ -64,8 +61,7 @@
         
         @Override
         public void handleEvent(Event event) {
-            T payLoad = converter.apply(toMap(event)); 
-            this.subscriber.onNext(payLoad);
+            this.subscriber.onNext(toMap(event));
         }
 
         Map<String, ?> toMap(Event event) {
diff --git a/rcomp-examples/bnd.bnd b/rcomp-examples/bnd.bnd
new file mode 100644
index 0000000..8b6f883
--- /dev/null
+++ b/rcomp-examples/bnd.bnd
@@ -0,0 +1 @@
+Web-ContextPath:/rcomp-examples
diff --git a/rcomp-examples/pom.xml b/rcomp-examples/pom.xml
new file mode 100644
index 0000000..f726a2c
--- /dev/null
+++ b/rcomp-examples/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>net.lr.reactive.component</groupId>
+        <artifactId>rcomp-parent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rcomp-examples</artifactId>
+    <!-- <repositories> <repository> <id>spring-milestones</id> <url>http://repo.spring.io/milestone</url> 
+        <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> 
+        </snapshots> </repository> </repositories> -->
+
+    <dependencies>
+        <dependency>
+            <groupId>net.lr.reactive.component</groupId>
+            <artifactId>rcomp-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-extra</artifactId>
+            <version>3.0.7.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-adapter</artifactId>
+            <version>3.0.7.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-client</artifactId>
+            <version>3.2.0-SNAPSHOT</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>8.1.15.v20140411</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <version>8.1.15.v20140411</version>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/reactortest/src/main/java/reactortest/ByteArrayConverter.java b/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
similarity index 100%
rename from reactortest/src/main/java/reactortest/ByteArrayConverter.java
rename to rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
diff --git a/reactortest/src/main/java/reactortest/DoubleConverter.java b/rcomp-examples/src/main/java/reactortest/DoubleConverter.java
similarity index 100%
rename from reactortest/src/main/java/reactortest/DoubleConverter.java
rename to rcomp-examples/src/main/java/reactortest/DoubleConverter.java
diff --git a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
similarity index 84%
rename from reactortest/src/main/java/reactortest/ExampleEventAdmin.java
rename to rcomp-examples/src/main/java/reactortest/EventAdminExample.java
index 47ff7b7..f4e490d 100644
--- a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
+++ b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
@@ -14,16 +14,16 @@
 import reactor.core.publisher.Flux;
 
 @Component(immediate=true)
-public class ExampleEventAdmin {
-    Logger LOG = LoggerFactory.getLogger(ExampleEventAdmin.class);
+public class EventAdminExample {
+    Logger LOG = LoggerFactory.getLogger(EventAdminExample.class);
     
     @Reference(target="(name=eventAdmin)")
     MComponent<Map<String, ? >> eventAdmin;
 
     @Activate
     public void start() throws Exception {
-        Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input", Map2Map::convert);
-        Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output", Map2Map::convert);
+        Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input");
+        Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output");
         Flux.from(fromTopic)
             .log()
             .subscribe(toTopic);
diff --git a/reactortest/src/main/java/reactortest/MqttExampleComponent.java b/rcomp-examples/src/main/java/reactortest/MqttExample.java
similarity index 73%
rename from reactortest/src/main/java/reactortest/MqttExampleComponent.java
rename to rcomp-examples/src/main/java/reactortest/MqttExample.java
index 8d079bd..feb214e 100644
--- a/reactortest/src/main/java/reactortest/MqttExampleComponent.java
+++ b/rcomp-examples/src/main/java/reactortest/MqttExample.java
@@ -13,8 +13,8 @@
 import reactor.math.MathFlux;
 
 @Component(immediate=true)
-public class MqttExampleComponent {
-    Logger LOG = LoggerFactory.getLogger(MqttExampleComponent.class);
+public class MqttExample {
+    Logger LOG = LoggerFactory.getLogger(MqttExample.class);
     
     @Reference(target="(name=mqtt)")
     MComponent<byte[]> mqtt;
@@ -22,12 +22,14 @@
     @Activate
     public void start() throws Exception {
         LOG.info("Starting mqtt test component");
-        Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
-        Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
+        Publisher<byte[]> fromTopic = mqtt.from("input");
+        Subscriber<byte[]> toTopic = mqtt.to("output");
         Flux.from(fromTopic)
+            .map(ByteArrayConverter::asInteger)
             .log()
             .window(2, 1)
             .flatMap(win -> MathFlux.averageDouble(win))
+            .map(DoubleConverter::asByteAr)
             .subscribe(toTopic);
         LOG.info("mqtt test component started");
     }
diff --git a/rcomp-examples/src/main/resources/1.txt b/rcomp-examples/src/main/resources/1.txt
new file mode 100644
index 0000000..5ab2f8a
--- /dev/null
+++ b/rcomp-examples/src/main/resources/1.txt
@@ -0,0 +1 @@
+Hello
\ No newline at end of file
diff --git a/rcomp-examples/src/main/resources/2.txt b/rcomp-examples/src/main/resources/2.txt
new file mode 100644
index 0000000..beef906
--- /dev/null
+++ b/rcomp-examples/src/main/resources/2.txt
@@ -0,0 +1 @@
+World
\ No newline at end of file
diff --git a/rcomp-examples/src/test/java/examples/Test1.java b/rcomp-examples/src/test/java/examples/Test1.java
new file mode 100644
index 0000000..808ead6
--- /dev/null
+++ b/rcomp-examples/src/test/java/examples/Test1.java
@@ -0,0 +1,36 @@
+package examples;
+
+
+import static java.time.Duration.of;
+
+import java.time.temporal.ChronoUnit;
+import java.util.function.Function;
+
+import org.junit.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.math.MathFlux;
+
+public class Test1 {
+    double result = 0;
+
+    @Test
+    public void testStream() {
+        Flux<Integer> flux = Flux.fromArray(new Integer[]{1,10,5,3,4});
+        MathFlux.averageDouble(flux).subscribe(System.out::println);
+    }
+    
+    @Test
+    public void testSlidingWindow() throws InterruptedException {
+        Flux.interval(of(1, ChronoUnit.MILLIS))
+            .transform(averageOfLastTwo())
+            .subscribe(System.out::println);
+        Thread.sleep(100);
+    }
+    
+    private Function<Flux<Long>, Flux<Double>> averageOfLastTwo() {
+        return f -> f.window(2, 1)
+            .flatMap(win -> MathFlux.averageDouble(win));
+    }
+
+}
diff --git a/rcomp-examples/src/test/java/examples/TestRs.java b/rcomp-examples/src/test/java/examples/TestRs.java
new file mode 100644
index 0000000..96aa543
--- /dev/null
+++ b/rcomp-examples/src/test/java/examples/TestRs.java
@@ -0,0 +1,73 @@
+package examples;
+
+import static reactor.core.publisher.Mono.fromCompletionStage;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Test;
+
+import reactor.core.publisher.Mono;
+
+public class TestRs {
+
+    @Test
+    public void testGet() throws Exception {
+        Server server = createServer();
+        server.start();
+        WebTarget client1 = ClientBuilder.newClient().target("http://localhost:8384/Hello");
+        WebTarget client2 = ClientBuilder.newClient().target("http://localhost:8384/World");
+        Mono<String> get1 = fromCompletionStage(client1.request().rx().get(String.class));
+        Mono<String> get2 = fromCompletionStage(client2.request().rx().get(String.class));
+        List<String> result = Mono
+            .from(get1)
+            .concatWith(get2)
+            .doOnError(ex -> ex.printStackTrace())
+            .collectList()
+            .block(Duration.ofMillis(1500));
+        String resultSt = result.stream().collect(Collectors.joining(" "));
+        org.junit.Assert.assertEquals("Hello World", resultSt);
+        server.stop();
+    }
+
+    private Server createServer() {
+        Server server = new Server();
+        SelectChannelConnector connector = new SelectChannelConnector();
+        connector.setPort(8384);
+        server.addConnector(connector);
+
+        ServletHandler handler = new ServletHandler();
+        Servlet servlet = new HttpServlet() {
+
+            @Override
+            protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+                throws ServletException, IOException {
+                String path = req.getServletPath().substring(1);
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+                resp.getWriter().append(path);
+            }
+            
+        };
+        handler.addServletWithMapping(new ServletHolder(servlet), "/");
+        server.setHandler(handler);
+        return server;
+    }
+    
+}
diff --git a/rcomp-mail/pom.xml b/rcomp-mail/pom.xml
new file mode 100644
index 0000000..3bfb5aa
--- /dev/null
+++ b/rcomp-mail/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>net.lr.reactive.component</groupId>
+        <artifactId>rcomp-parent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rcomp-mail</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>net.lr.reactive.component</groupId>
+            <artifactId>rcomp-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>javax.mail</groupId>
+            <artifactId>javax.mail-api</artifactId>
+            <version>1.5.6</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.mail</groupId>
+            <artifactId>mail</artifactId>
+            <version>1.5.0-b01</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/rcomp-mail/src/main/java/component/mail/MailComponent.java b/rcomp-mail/src/main/java/component/mail/MailComponent.java
new file mode 100644
index 0000000..81986c2
--- /dev/null
+++ b/rcomp-mail/src/main/java/component/mail/MailComponent.java
@@ -0,0 +1,29 @@
+package component.mail;
+
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+import component.api.MComponent;
+
+@Component(property="name=mail")
+public class MailComponent implements MComponent<MimeMessage> {
+    
+    @Reference
+    Session session;
+
+    @Override
+    public Publisher<MimeMessage> from(String topic) {
+        throw new RuntimeException();
+    }  
+    
+    @Override
+    public Subscriber<MimeMessage> to(String destination) {
+        return new MailDestination(destination);
+    }
+
+}
diff --git a/reactortest/src/main/java/component/mail/MailDestination.java b/rcomp-mail/src/main/java/component/mail/MailDestination.java
similarity index 71%
rename from reactortest/src/main/java/component/mail/MailDestination.java
rename to rcomp-mail/src/main/java/component/mail/MailDestination.java
index 33dd1da..d9ebf27 100644
--- a/reactortest/src/main/java/component/mail/MailDestination.java
+++ b/rcomp-mail/src/main/java/component/mail/MailDestination.java
@@ -1,7 +1,5 @@
 package component.mail;
 
-import java.util.function.Function;
-
 import javax.mail.Address;
 import javax.mail.Transport;
 import javax.mail.internet.InternetAddress;
@@ -10,15 +8,13 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MailDestination<T> implements Subscriber<T> {
+public class MailDestination implements Subscriber<MimeMessage> {
 
     private String destination;
-    private Function<T, MimeMessage> converter;
     private Subscription subscription;
 
-    public MailDestination(String destination, Function<T, MimeMessage> converter) {
+    public MailDestination(String destination) {
         this.destination = destination;
-        this.converter = converter;
     }
 
     @Override
@@ -28,9 +24,8 @@
     }
 
     @Override
-    public void onNext(T payload) {
+    public void onNext(MimeMessage message) {
         try {
-            MimeMessage message = converter.apply(payload);
             Address[] addresses = new Address[]{new InternetAddress(destination)};
             Transport.send(message, addresses);
         } catch (Exception e) {
diff --git a/reactortest/src/main/java/component/mail/SessionComponent.java b/rcomp-mail/src/main/java/component/mail/SessionComponent.java
similarity index 92%
rename from reactortest/src/main/java/component/mail/SessionComponent.java
rename to rcomp-mail/src/main/java/component/mail/SessionComponent.java
index 9ade977..d3be1dc 100644
--- a/reactortest/src/main/java/component/mail/SessionComponent.java
+++ b/rcomp-mail/src/main/java/component/mail/SessionComponent.java
@@ -5,7 +5,6 @@
 
 import javax.mail.Session;
 
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
@@ -18,7 +17,7 @@
     private ServiceRegistration<Session> sreg;
 
     @Activate
-    public void activate(Map<String, Object> config, BundleContext context) throws MqttException {
+    public void activate(Map<String, Object> config, BundleContext context) {
         Session session = create(config);
         sreg = context.registerService(Session.class, session, null);
     }
diff --git a/reactortest/src/test/java/component/mail/TestMail.java b/rcomp-mail/src/test/java/component/mail/TestMail.java
similarity index 87%
rename from reactortest/src/test/java/component/mail/TestMail.java
rename to rcomp-mail/src/test/java/component/mail/TestMail.java
index a37fec0..d27e4c0 100644
--- a/reactortest/src/test/java/component/mail/TestMail.java
+++ b/rcomp-mail/src/test/java/component/mail/TestMail.java
@@ -25,8 +25,8 @@
         MailComponent mail = new MailComponent();
         mail.session = session;
 
-        Subscriber<String> to = mail.to("cschneider@localhost", txt -> createMessage(session, txt));
-        Flux.just("Test").subscribe(to);
+        Subscriber<MimeMessage> to = mail.to("cschneider@localhost");
+        Flux.just("Test").map(txt -> createMessage(session, txt)).subscribe(to);
     }
 
     private MimeMessage createMessage(Session session, String body) {
diff --git a/rcomp-mqtt/pom.xml b/rcomp-mqtt/pom.xml
new file mode 100644
index 0000000..080ae04
--- /dev/null
+++ b/rcomp-mqtt/pom.xml
@@ -0,0 +1,25 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>net.lr.reactive.component</groupId>
+        <artifactId>rcomp-parent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rcomp-mqtt</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>net.lr.reactive.component</groupId>
+            <artifactId>rcomp-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/reactortest/src/main/java/component/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
similarity index 76%
rename from reactortest/src/main/java/component/mqtt/MqttComponent.java
rename to rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
index a4c3b46..f35f492 100644
--- a/reactortest/src/main/java/component/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
@@ -1,7 +1,5 @@
 package component.mqtt;
 
-import java.util.function.Function;
-
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.osgi.service.component.annotations.Activate;
@@ -37,13 +35,13 @@
     }
 
     @Override
-    public <T> Publisher<T> from(String topic, Function<byte[], T> converter) throws Exception {
-        return new MqttSource<T>(client, topic, converter);
+    public Publisher<byte[]> from(String topic) {
+        return new MqttSource(client, topic);
     }  
     
     @Override
-    public <T> Subscriber<T> to(String topic, Function<T, byte[]> converter) throws Exception {
-        return new MqttDestination<T>(client, topic, converter);
+    public Subscriber<byte[]> to(String topic) {
+        return new MqttDestination(client, topic);
     }
 
 }
diff --git a/reactortest/src/main/java/component/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
similarity index 70%
rename from reactortest/src/main/java/component/mqtt/MqttDestination.java
rename to rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
index d5a874d..daa0c57 100644
--- a/reactortest/src/main/java/component/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
@@ -1,23 +1,19 @@
 package component.mqtt;
 
-import java.util.function.Function;
-
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MqttDestination<T> implements Subscriber<T> {
+public class MqttDestination implements Subscriber<byte[]> {
     
     private MqttClient client;
     private String topic;
-    private Function<T, byte[]> converter;
     private Subscription subscription;
 
-    public MqttDestination(MqttClient client, String topic, Function<T, byte[]> converter) {
+    public MqttDestination(MqttClient client, String topic) {
         this.client = client;
         this.topic = topic;
-        this.converter = converter;
     }
 
     @Override
@@ -27,9 +23,9 @@
     }
 
     @Override
-    public void onNext(T payload) {
+    public void onNext(byte[] payload) {
         try {
-            MqttMessage message = new MqttMessage(converter.apply(payload));
+            MqttMessage message = new MqttMessage(payload);
             this.client.publish(topic, message);
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git a/reactortest/src/main/java/component/mqtt/MqttSource.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
similarity index 74%
rename from reactortest/src/main/java/component/mqtt/MqttSource.java
rename to rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
index 6e72acd..5b81963 100644
--- a/reactortest/src/main/java/component/mqtt/MqttSource.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
@@ -1,7 +1,6 @@
 package component.mqtt;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
 
 import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
 import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -11,27 +10,25 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-public class MqttSource <T> implements Publisher<T> {
-    private Function<byte[], T> converter;
+public class MqttSource implements Publisher<byte[]> {
     private MqttClient client;
     private String topic;
     
-    public MqttSource(MqttClient client, String topic, Function<byte[], T> converter) {
+    public MqttSource(MqttClient client, String topic) {
         this.client = client;
         this.topic = topic;
-        this.converter = converter;
     }
     
     @Override
-    public void subscribe(Subscriber<? super T> subscriber) {
+    public void subscribe(Subscriber<? super byte[]> subscriber) {
         subscriber.onSubscribe(new MqttSubscription(subscriber));
     }
 
     public class MqttSubscription implements IMqttMessageListener, Subscription {
         private AtomicBoolean subScribed;
-        private Subscriber<? super T> subscriber;
+        private Subscriber<? super byte[]> subscriber;
         
-        public MqttSubscription(Subscriber<? super T> subscriber) {
+        public MqttSubscription(Subscriber<? super byte[]> subscriber) {
             this.subscriber = subscriber;
             this.subScribed = new AtomicBoolean(false);
         }
@@ -60,8 +57,7 @@
         
         @Override
         public void messageArrived(String topic, MqttMessage message) throws Exception {
-            T payLoad = converter.apply(message.getPayload()); 
-            this.subscriber.onNext(payLoad);
+            this.subscriber.onNext(message.getPayload());
         }
 
     }
diff --git a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
new file mode 100644
index 0000000..32a1fe0
--- /dev/null
+++ b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
@@ -0,0 +1,43 @@
+package component.mqtt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.Assert;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+import reactor.core.publisher.Flux;
+
+public class Test1 {
+    Integer result = 0;
+
+    @Test
+    public void testMQtt() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId(), new MemoryPersistence());
+        client.connect();
+        MqttComponent mqtt = new MqttComponent();
+        mqtt.client = client;
+        Publisher<byte[]> fromTopic = mqtt.from("input");
+        Subscriber<byte[]> toTopic = mqtt.to("output");
+        Flux.from(fromTopic)
+            .log()
+            .subscribe(toTopic);
+        
+        client.subscribe("output", (topic, message) -> {
+            result = new Integer(new String(message.getPayload()));
+            latch.countDown();
+        });
+        client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
+        client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
+        latch.await(100, TimeUnit.SECONDS);
+        Assert.assertEquals(2, result, 0.1);
+        client.disconnect();
+        client.close();
+    }
+}
diff --git a/reactortest/pom.xml b/reactortest/pom.xml
deleted file mode 100644
index 7029440..0000000
--- a/reactortest/pom.xml
+++ /dev/null
@@ -1,129 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <groupId>net.lr</groupId>
-    <artifactId>reactortest</artifactId>
-    <version>0.0.1-SNAPSHOT</version>
-
-    <repositories>
-        <repository>
-            <id>spring-milestones</id>
-            <url>http://repo.spring.io/milestone</url>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
-
-    <build>
-        <plugins>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.5.1</version>
-                <configuration>
-                    <!-- http://maven.apache.org/plugins/maven-compiler-plugin/ -->
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>biz.aQute.bnd</groupId>
-                <artifactId>bnd-maven-plugin</artifactId>
-                <version>3.2.0</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>bnd-process</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>2.5</version>
-                <configuration>
-                    <archive>
-                        <manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
-                    </archive>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.osgi</groupId>
-            <artifactId>osgi.core</artifactId>
-            <version>6.0.0</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.osgi</groupId>
-            <artifactId>osgi.cmpn</artifactId>
-            <version>6.0.0</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>1.7.7</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-core</artifactId>
-            <version>3.0.7.RELEASE</version>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor.addons</groupId>
-            <artifactId>reactor-extra</artifactId>
-            <version>3.0.7.RELEASE</version>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor.addons</groupId>
-            <artifactId>reactor-adapter</artifactId>
-            <version>3.0.7.RELEASE</version>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor.kafka</groupId>
-            <artifactId>reactor-kafka</artifactId>
-            <version>1.0.0.M2</version>
-        </dependency>
-        <dependency>
-            <groupId>org.eclipse.paho</groupId>
-            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
-            <version>1.1.1</version>
-        </dependency>
-        <dependency>
-            <groupId>javax.mail</groupId>
-            <artifactId>javax.mail-api</artifactId>
-            <version>1.5.6</version>
-        </dependency>
-        <dependency>
-            <groupId>javax.mail</groupId>
-            <artifactId>mail</artifactId>
-            <version>1.5.0-b01</version>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
-            <version>1.7.7</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-</project>
\ No newline at end of file
diff --git a/reactortest/src/main/java/component/api/MComponent.java b/reactortest/src/main/java/component/api/MComponent.java
deleted file mode 100644
index 7f7027c..0000000
--- a/reactortest/src/main/java/component/api/MComponent.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package component.api;
-
-import java.util.function.Function;
-
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-public interface MComponent<F> {
-    <T> Publisher<T> from(String destination, Function<F, T> converter) throws Exception;
-    <T> Subscriber<T> to(String destination, Function<T, F> converter) throws Exception;
-}
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java b/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
deleted file mode 100644
index c2ce58f..0000000
--- a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package component.eventadmin;
-
-import java.util.Map;
-import java.util.function.Function;
-
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-import component.api.MComponent;
-
-@Component(property="name=eventAdmin")
-public class EventAdminComponent implements MComponent<Map<String, ?>> {
-    
-    private BundleContext context;
-
-    @Reference
-    EventAdmin client;
-    
-
-    @Activate
-    public void activate(BundleContext context) throws MqttException {
-        this.context = context;
-    }
-    
-    @Deactivate
-    public void deactivate() throws MqttException {
-    }
-
-    @Override
-    public <T> Publisher<T> from(String topic, Function<Map<String, ?>, T> converter) throws Exception {
-        return new EventAdminSource<T>(context, topic, converter);
-    }  
-    
-    @Override
-    public <T> Subscriber<T> to(String topic, Function<T, Map<String, ?>> converter) throws Exception {
-        return new EventAdminDestination<T>(client, topic, converter);
-    }
-
-}
diff --git a/reactortest/src/main/java/component/mail/MailComponent.java b/reactortest/src/main/java/component/mail/MailComponent.java
deleted file mode 100644
index a932271..0000000
--- a/reactortest/src/main/java/component/mail/MailComponent.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package component.mail;
-
-import java.util.function.Function;
-
-import javax.mail.MethodNotSupportedException;
-import javax.mail.Session;
-import javax.mail.internet.MimeMessage;
-
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-import component.api.MComponent;
-
-@Component(property="name=mail")
-public class MailComponent implements MComponent<MimeMessage> {
-    
-    @Reference
-    Session session;
-
-    @Override
-    public <T> Publisher<T> from(String topic, Function<MimeMessage, T> converter) throws Exception {
-        throw new MethodNotSupportedException();
-    }  
-    
-    @Override
-    public <T> Subscriber<T> to(String destination, Function<T, MimeMessage> converter) throws Exception {
-        return new MailDestination<T>(destination, converter);
-    }
-
-}
diff --git a/reactortest/src/main/java/reactortest/Map2Map.java b/reactortest/src/main/java/reactortest/Map2Map.java
deleted file mode 100644
index 3acc724..0000000
--- a/reactortest/src/main/java/reactortest/Map2Map.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package reactortest;
-
-import java.util.Map;
-
-public class Map2Map {
-    public static Map<String, ?> convert(Map<String, ?> source) {
-        return source;
-    }
-}
diff --git a/reactortest/src/test/java/component/mqtt/Test1.java b/reactortest/src/test/java/component/mqtt/Test1.java
deleted file mode 100644
index 2c15d1f..0000000
--- a/reactortest/src/test/java/component/mqtt/Test1.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package component.mqtt;
-
-import static java.time.Duration.of;
-
-import java.time.temporal.ChronoUnit;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.junit.Assert;
-import org.junit.Test;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-import reactor.core.publisher.Flux;
-import reactor.math.MathFlux;
-import reactortest.ByteArrayConverter;
-import reactortest.DoubleConverter;
-
-public class Test1 {
-    double result = 0;
-
-    @Test
-    public void testStream() {
-        Flux<Integer> flux = Flux.fromArray(new Integer[]{1,10,5,3,4});
-        MathFlux.averageDouble(flux).subscribe(System.out::println);
-    }
-    
-    @Test
-    public void testSlidingWindow() throws InterruptedException {
-        Flux.interval(of(1, ChronoUnit.MILLIS))
-        .transform(averageOfLastTwo())
-        .subscribe(System.out::println);
-        Thread.sleep(100);
-    }
-    
-    private Function<Flux<Long>, Flux<Double>> averageOfLastTwo() {
-        return f -> f.window(2, 1)
-            .flatMap(win -> MathFlux.averageDouble(win));
-    }
-
-    @Test
-    public void testMQtt() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId());
-        client.connect();
-        MqttComponent mqtt = new MqttComponent();
-        mqtt.client = client;
-        Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
-        Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
-        Flux.from(fromTopic)
-            .log()
-            .window(2, 1)
-            .flatMap(win -> MathFlux.averageDouble(win))
-            .log()
-            .subscribe(toTopic);
-        
-        client.subscribe("output", (topic, message) -> {
-            result = ByteArrayConverter.asDouble(message.getPayload());
-            latch.countDown();
-        });
-        client.publish("input", new MqttMessage(ByteArrayConverter.fromInteger(2)));
-        client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
-        latch.await(10, TimeUnit.SECONDS);
-        Assert.assertEquals(2, result, 0.1);
-        client.disconnect();
-        client.close();
-    }
-}