Split into bundles and add test with cxf jax-rs
diff --git a/reactortest/README.md b/README.md
similarity index 76%
rename from reactortest/README.md
rename to README.md
index daa86e0..cf43bc0 100644
--- a/reactortest/README.md
+++ b/README.md
@@ -28,11 +28,16 @@
install -s mvn:io.projectreactor/reactor-core/3.0.7.RELEASE
install -s wrap:mvn:io.projectreactor.addons/reactor-extra/3.0.7.RELEASE
install -s mvn:javax.mail/mail/1.5.0-b01
-install -s mvn:net.lr/reactortest/0.0.1-SNAPSHOT
+
+install -s mvn:net.lr.reactive.component/rcomp-api/1.0.0-SNAPSHOT
+install -s mvn:net.lr.reactive.component/rcomp-mqtt/1.0.0-SNAPSHOT
+install -s mvn:net.lr.reactive.component/rcomp-eventadmin/1.0.0-SNAPSHOT
+install -s mvn:net.lr.reactive.component/rcomp-examples/1.0.0-SNAPSHOT
```
## Test
+## Mqtt
Start mqtt client
Subscribe to topic "output".
@@ -40,4 +45,11 @@
You should receive the following on the topic output: "1.5", "2.5"
+# EventAdmin
+```
+event:send input a=b
+log:tail
+```
+
+The log should show that the event was received.
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..b6359d4
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,108 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <properties>
+ </properties>
+
+ <modules>
+ <module>rcomp-api</module>
+ <module>rcomp-eventadmin</module>
+ <module>rcomp-mqtt</module>
+ <module>rcomp-mail</module>
+ <module>rcomp-examples</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.21</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.core</artifactId>
+ <version>6.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.cmpn</artifactId>
+ <version>6.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>1.7.21</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <version>3.0.7.RELEASE</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-maven-plugin</artifactId>
+ <version>3.3.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>bnd-process</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <archive>
+ <manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/rcomp-api/bnd.bnd b/rcomp-api/bnd.bnd
new file mode 100644
index 0000000..95fa3f2
--- /dev/null
+++ b/rcomp-api/bnd.bnd
@@ -0,0 +1 @@
+Export-Package: component.api
\ No newline at end of file
diff --git a/rcomp-api/pom.xml b/rcomp-api/pom.xml
new file mode 100644
index 0000000..587acbc
--- /dev/null
+++ b/rcomp-api/pom.xml
@@ -0,0 +1,9 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rcomp-api</artifactId>
+</project>
\ No newline at end of file
diff --git a/rcomp-api/src/main/java/component/api/MComponent.java b/rcomp-api/src/main/java/component/api/MComponent.java
new file mode 100644
index 0000000..67d0ea8
--- /dev/null
+++ b/rcomp-api/src/main/java/component/api/MComponent.java
@@ -0,0 +1,9 @@
+package component.api;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+public interface MComponent<T> {
+ Publisher<T> from(String destination);
+ Subscriber<T> to(String destination);
+}
diff --git a/rcomp-eventadmin/pom.xml b/rcomp-eventadmin/pom.xml
new file mode 100644
index 0000000..1ec74dc
--- /dev/null
+++ b/rcomp-eventadmin/pom.xml
@@ -0,0 +1,17 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rcomp-eventadmin</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
new file mode 100644
index 0000000..5b264a0
--- /dev/null
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
@@ -0,0 +1,44 @@
+package component.eventadmin;
+
+import java.util.Map;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+import component.api.MComponent;
+
+@Component(property="name=eventAdmin")
+public class EventAdminComponent implements MComponent<Map<String, ?>> {
+
+ private BundleContext context;
+
+ @Reference
+ EventAdmin client;
+
+
+ @Activate
+ public void activate(BundleContext context) {
+ this.context = context;
+ }
+
+ @Deactivate
+ public void deactivate() {
+ }
+
+ @Override
+ public Publisher<Map<String, ?>> from(String topic) {
+ return new EventAdminSource(context, topic);
+ }
+
+ @Override
+ public Subscriber<Map<String, ?>> to(String topic) {
+ return new EventAdminDestination(client, topic);
+ }
+
+}
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
similarity index 72%
rename from reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
rename to rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
index a1a8b96..0e7f667 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminDestination.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
@@ -1,24 +1,21 @@
package component.eventadmin;
import java.util.Map;
-import java.util.function.Function;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class EventAdminDestination<T> implements Subscriber<T> {
+public class EventAdminDestination implements Subscriber<Map<String, ?>> {
private EventAdmin client;
private String topic;
- private Function<T, Map<String, ?>> converter;
private Subscription subscription;
- public EventAdminDestination(EventAdmin client, String topic, Function<T, Map<String, ?>> converter) {
+ public EventAdminDestination(EventAdmin client, String topic) {
this.client = client;
this.topic = topic;
- this.converter = converter;
}
@Override
@@ -28,9 +25,9 @@
}
@Override
- public void onNext(T payload) {
+ public void onNext(Map<String, ?> payload) {
try {
- Event event = new Event(topic, converter.apply(payload));
+ Event event = new Event(topic, payload);
this.client.sendEvent(event);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminSource.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
similarity index 73%
rename from reactortest/src/main/java/component/eventadmin/EventAdminSource.java
rename to rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
index 80bc7b7..bbb560b 100644
--- a/reactortest/src/main/java/component/eventadmin/EventAdminSource.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
@@ -5,7 +5,6 @@
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
@@ -16,28 +15,26 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class EventAdminSource <T> implements Publisher<T> {
+public class EventAdminSource implements Publisher<Map<String, ?>> {
private BundleContext context;
- private Function<Map<String, ?>, T> converter;
private String topic;
- public EventAdminSource(BundleContext context, String topic, Function<Map<String, ?>, T> converter) {
+ public EventAdminSource(BundleContext context, String topic) {
this.context = context;
this.topic = topic;
- this.converter = converter;
}
@Override
- public void subscribe(Subscriber<? super T> subscriber) {
- subscriber.onSubscribe(new MqttSubscription(subscriber));
+ public void subscribe(Subscriber<? super Map<String, ?>> subscriber) {
+ subscriber.onSubscribe(new EventAdminSubscription(subscriber));
}
- public class MqttSubscription implements Subscription, EventHandler {
+ public class EventAdminSubscription implements Subscription, EventHandler {
private AtomicBoolean subScribed;
private ServiceRegistration<EventHandler> sreg;
- private Subscriber<? super T> subscriber;
+ private Subscriber<? super Map<String, ?>> subscriber;
- public MqttSubscription(Subscriber<? super T> subscriber) {
+ public EventAdminSubscription(Subscriber<? super Map<String, ?>> subscriber) {
this.subscriber = subscriber;
this.subScribed = new AtomicBoolean(false);
}
@@ -64,8 +61,7 @@
@Override
public void handleEvent(Event event) {
- T payLoad = converter.apply(toMap(event));
- this.subscriber.onNext(payLoad);
+ this.subscriber.onNext(toMap(event));
}
Map<String, ?> toMap(Event event) {
diff --git a/rcomp-examples/bnd.bnd b/rcomp-examples/bnd.bnd
new file mode 100644
index 0000000..8b6f883
--- /dev/null
+++ b/rcomp-examples/bnd.bnd
@@ -0,0 +1 @@
+Web-ContextPath:/rcomp-examples
diff --git a/rcomp-examples/pom.xml b/rcomp-examples/pom.xml
new file mode 100644
index 0000000..f726a2c
--- /dev/null
+++ b/rcomp-examples/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rcomp-examples</artifactId>
+ <!-- <repositories> <repository> <id>spring-milestones</id> <url>http://repo.spring.io/milestone</url>
+ <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled>
+ </snapshots> </repository> </repositories> -->
+
+ <dependencies>
+ <dependency>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ <version>3.0.7.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-adapter</artifactId>
+ <version>3.0.7.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-client</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>8.1.15.v20140411</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>8.1.15.v20140411</version>
+ </dependency>
+
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/reactortest/src/main/java/reactortest/ByteArrayConverter.java b/rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
similarity index 100%
rename from reactortest/src/main/java/reactortest/ByteArrayConverter.java
rename to rcomp-examples/src/main/java/reactortest/ByteArrayConverter.java
diff --git a/reactortest/src/main/java/reactortest/DoubleConverter.java b/rcomp-examples/src/main/java/reactortest/DoubleConverter.java
similarity index 100%
rename from reactortest/src/main/java/reactortest/DoubleConverter.java
rename to rcomp-examples/src/main/java/reactortest/DoubleConverter.java
diff --git a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
similarity index 84%
rename from reactortest/src/main/java/reactortest/ExampleEventAdmin.java
rename to rcomp-examples/src/main/java/reactortest/EventAdminExample.java
index 47ff7b7..f4e490d 100644
--- a/reactortest/src/main/java/reactortest/ExampleEventAdmin.java
+++ b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
@@ -14,16 +14,16 @@
import reactor.core.publisher.Flux;
@Component(immediate=true)
-public class ExampleEventAdmin {
- Logger LOG = LoggerFactory.getLogger(ExampleEventAdmin.class);
+public class EventAdminExample {
+ Logger LOG = LoggerFactory.getLogger(EventAdminExample.class);
@Reference(target="(name=eventAdmin)")
MComponent<Map<String, ? >> eventAdmin;
@Activate
public void start() throws Exception {
- Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input", Map2Map::convert);
- Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output", Map2Map::convert);
+ Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input");
+ Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output");
Flux.from(fromTopic)
.log()
.subscribe(toTopic);
diff --git a/reactortest/src/main/java/reactortest/MqttExampleComponent.java b/rcomp-examples/src/main/java/reactortest/MqttExample.java
similarity index 73%
rename from reactortest/src/main/java/reactortest/MqttExampleComponent.java
rename to rcomp-examples/src/main/java/reactortest/MqttExample.java
index 8d079bd..feb214e 100644
--- a/reactortest/src/main/java/reactortest/MqttExampleComponent.java
+++ b/rcomp-examples/src/main/java/reactortest/MqttExample.java
@@ -13,8 +13,8 @@
import reactor.math.MathFlux;
@Component(immediate=true)
-public class MqttExampleComponent {
- Logger LOG = LoggerFactory.getLogger(MqttExampleComponent.class);
+public class MqttExample {
+ Logger LOG = LoggerFactory.getLogger(MqttExample.class);
@Reference(target="(name=mqtt)")
MComponent<byte[]> mqtt;
@@ -22,12 +22,14 @@
@Activate
public void start() throws Exception {
LOG.info("Starting mqtt test component");
- Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
- Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
+ Publisher<byte[]> fromTopic = mqtt.from("input");
+ Subscriber<byte[]> toTopic = mqtt.to("output");
Flux.from(fromTopic)
+ .map(ByteArrayConverter::asInteger)
.log()
.window(2, 1)
.flatMap(win -> MathFlux.averageDouble(win))
+ .map(DoubleConverter::asByteAr)
.subscribe(toTopic);
LOG.info("mqtt test component started");
}
diff --git a/rcomp-examples/src/main/resources/1.txt b/rcomp-examples/src/main/resources/1.txt
new file mode 100644
index 0000000..5ab2f8a
--- /dev/null
+++ b/rcomp-examples/src/main/resources/1.txt
@@ -0,0 +1 @@
+Hello
\ No newline at end of file
diff --git a/rcomp-examples/src/main/resources/2.txt b/rcomp-examples/src/main/resources/2.txt
new file mode 100644
index 0000000..beef906
--- /dev/null
+++ b/rcomp-examples/src/main/resources/2.txt
@@ -0,0 +1 @@
+World
\ No newline at end of file
diff --git a/rcomp-examples/src/test/java/examples/Test1.java b/rcomp-examples/src/test/java/examples/Test1.java
new file mode 100644
index 0000000..808ead6
--- /dev/null
+++ b/rcomp-examples/src/test/java/examples/Test1.java
@@ -0,0 +1,36 @@
+package examples;
+
+
+import static java.time.Duration.of;
+
+import java.time.temporal.ChronoUnit;
+import java.util.function.Function;
+
+import org.junit.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.math.MathFlux;
+
+public class Test1 {
+ double result = 0;
+
+ @Test
+ public void testStream() {
+ Flux<Integer> flux = Flux.fromArray(new Integer[]{1,10,5,3,4});
+ MathFlux.averageDouble(flux).subscribe(System.out::println);
+ }
+
+ @Test
+ public void testSlidingWindow() throws InterruptedException {
+ Flux.interval(of(1, ChronoUnit.MILLIS))
+ .transform(averageOfLastTwo())
+ .subscribe(System.out::println);
+ Thread.sleep(100);
+ }
+
+ private Function<Flux<Long>, Flux<Double>> averageOfLastTwo() {
+ return f -> f.window(2, 1)
+ .flatMap(win -> MathFlux.averageDouble(win));
+ }
+
+}
diff --git a/rcomp-examples/src/test/java/examples/TestRs.java b/rcomp-examples/src/test/java/examples/TestRs.java
new file mode 100644
index 0000000..96aa543
--- /dev/null
+++ b/rcomp-examples/src/test/java/examples/TestRs.java
@@ -0,0 +1,73 @@
+package examples;
+
+import static reactor.core.publisher.Mono.fromCompletionStage;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Test;
+
+import reactor.core.publisher.Mono;
+
+public class TestRs {
+
+ @Test
+ public void testGet() throws Exception {
+ Server server = createServer();
+ server.start();
+ WebTarget client1 = ClientBuilder.newClient().target("http://localhost:8384/Hello");
+ WebTarget client2 = ClientBuilder.newClient().target("http://localhost:8384/World");
+ Mono<String> get1 = fromCompletionStage(client1.request().rx().get(String.class));
+ Mono<String> get2 = fromCompletionStage(client2.request().rx().get(String.class));
+ List<String> result = Mono
+ .from(get1)
+ .concatWith(get2)
+ .doOnError(ex -> ex.printStackTrace())
+ .collectList()
+ .block(Duration.ofMillis(1500));
+ String resultSt = result.stream().collect(Collectors.joining(" "));
+ org.junit.Assert.assertEquals("Hello World", resultSt);
+ server.stop();
+ }
+
+ private Server createServer() {
+ Server server = new Server();
+ SelectChannelConnector connector = new SelectChannelConnector();
+ connector.setPort(8384);
+ server.addConnector(connector);
+
+ ServletHandler handler = new ServletHandler();
+ Servlet servlet = new HttpServlet() {
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ String path = req.getServletPath().substring(1);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ resp.getWriter().append(path);
+ }
+
+ };
+ handler.addServletWithMapping(new ServletHolder(servlet), "/");
+ server.setHandler(handler);
+ return server;
+ }
+
+}
diff --git a/rcomp-mail/pom.xml b/rcomp-mail/pom.xml
new file mode 100644
index 0000000..3bfb5aa
--- /dev/null
+++ b/rcomp-mail/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rcomp-mail</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.mail</groupId>
+ <artifactId>javax.mail-api</artifactId>
+ <version>1.5.6</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ <version>1.5.0-b01</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/rcomp-mail/src/main/java/component/mail/MailComponent.java b/rcomp-mail/src/main/java/component/mail/MailComponent.java
new file mode 100644
index 0000000..81986c2
--- /dev/null
+++ b/rcomp-mail/src/main/java/component/mail/MailComponent.java
@@ -0,0 +1,29 @@
+package component.mail;
+
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+import component.api.MComponent;
+
+@Component(property="name=mail")
+public class MailComponent implements MComponent<MimeMessage> {
+
+ @Reference
+ Session session;
+
+ @Override
+ public Publisher<MimeMessage> from(String topic) {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public Subscriber<MimeMessage> to(String destination) {
+ return new MailDestination(destination);
+ }
+
+}
diff --git a/reactortest/src/main/java/component/mail/MailDestination.java b/rcomp-mail/src/main/java/component/mail/MailDestination.java
similarity index 71%
rename from reactortest/src/main/java/component/mail/MailDestination.java
rename to rcomp-mail/src/main/java/component/mail/MailDestination.java
index 33dd1da..d9ebf27 100644
--- a/reactortest/src/main/java/component/mail/MailDestination.java
+++ b/rcomp-mail/src/main/java/component/mail/MailDestination.java
@@ -1,7 +1,5 @@
package component.mail;
-import java.util.function.Function;
-
import javax.mail.Address;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
@@ -10,15 +8,13 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MailDestination<T> implements Subscriber<T> {
+public class MailDestination implements Subscriber<MimeMessage> {
private String destination;
- private Function<T, MimeMessage> converter;
private Subscription subscription;
- public MailDestination(String destination, Function<T, MimeMessage> converter) {
+ public MailDestination(String destination) {
this.destination = destination;
- this.converter = converter;
}
@Override
@@ -28,9 +24,8 @@
}
@Override
- public void onNext(T payload) {
+ public void onNext(MimeMessage message) {
try {
- MimeMessage message = converter.apply(payload);
Address[] addresses = new Address[]{new InternetAddress(destination)};
Transport.send(message, addresses);
} catch (Exception e) {
diff --git a/reactortest/src/main/java/component/mail/SessionComponent.java b/rcomp-mail/src/main/java/component/mail/SessionComponent.java
similarity index 92%
rename from reactortest/src/main/java/component/mail/SessionComponent.java
rename to rcomp-mail/src/main/java/component/mail/SessionComponent.java
index 9ade977..d3be1dc 100644
--- a/reactortest/src/main/java/component/mail/SessionComponent.java
+++ b/rcomp-mail/src/main/java/component/mail/SessionComponent.java
@@ -5,7 +5,6 @@
import javax.mail.Session;
-import org.eclipse.paho.client.mqttv3.MqttException;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
@@ -18,7 +17,7 @@
private ServiceRegistration<Session> sreg;
@Activate
- public void activate(Map<String, Object> config, BundleContext context) throws MqttException {
+ public void activate(Map<String, Object> config, BundleContext context) {
Session session = create(config);
sreg = context.registerService(Session.class, session, null);
}
diff --git a/reactortest/src/test/java/component/mail/TestMail.java b/rcomp-mail/src/test/java/component/mail/TestMail.java
similarity index 87%
rename from reactortest/src/test/java/component/mail/TestMail.java
rename to rcomp-mail/src/test/java/component/mail/TestMail.java
index a37fec0..d27e4c0 100644
--- a/reactortest/src/test/java/component/mail/TestMail.java
+++ b/rcomp-mail/src/test/java/component/mail/TestMail.java
@@ -25,8 +25,8 @@
MailComponent mail = new MailComponent();
mail.session = session;
- Subscriber<String> to = mail.to("cschneider@localhost", txt -> createMessage(session, txt));
- Flux.just("Test").subscribe(to);
+ Subscriber<MimeMessage> to = mail.to("cschneider@localhost");
+ Flux.just("Test").map(txt -> createMessage(session, txt)).subscribe(to);
}
private MimeMessage createMessage(Session session, String body) {
diff --git a/rcomp-mqtt/pom.xml b/rcomp-mqtt/pom.xml
new file mode 100644
index 0000000..080ae04
--- /dev/null
+++ b/rcomp-mqtt/pom.xml
@@ -0,0 +1,25 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rcomp-mqtt</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>net.lr.reactive.component</groupId>
+ <artifactId>rcomp-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/reactortest/src/main/java/component/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
similarity index 76%
rename from reactortest/src/main/java/component/mqtt/MqttComponent.java
rename to rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
index a4c3b46..f35f492 100644
--- a/reactortest/src/main/java/component/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
@@ -1,7 +1,5 @@
package component.mqtt;
-import java.util.function.Function;
-
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.osgi.service.component.annotations.Activate;
@@ -37,13 +35,13 @@
}
@Override
- public <T> Publisher<T> from(String topic, Function<byte[], T> converter) throws Exception {
- return new MqttSource<T>(client, topic, converter);
+ public Publisher<byte[]> from(String topic) {
+ return new MqttSource(client, topic);
}
@Override
- public <T> Subscriber<T> to(String topic, Function<T, byte[]> converter) throws Exception {
- return new MqttDestination<T>(client, topic, converter);
+ public Subscriber<byte[]> to(String topic) {
+ return new MqttDestination(client, topic);
}
}
diff --git a/reactortest/src/main/java/component/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
similarity index 70%
rename from reactortest/src/main/java/component/mqtt/MqttDestination.java
rename to rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
index d5a874d..daa0c57 100644
--- a/reactortest/src/main/java/component/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
@@ -1,23 +1,19 @@
package component.mqtt;
-import java.util.function.Function;
-
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MqttDestination<T> implements Subscriber<T> {
+public class MqttDestination implements Subscriber<byte[]> {
private MqttClient client;
private String topic;
- private Function<T, byte[]> converter;
private Subscription subscription;
- public MqttDestination(MqttClient client, String topic, Function<T, byte[]> converter) {
+ public MqttDestination(MqttClient client, String topic) {
this.client = client;
this.topic = topic;
- this.converter = converter;
}
@Override
@@ -27,9 +23,9 @@
}
@Override
- public void onNext(T payload) {
+ public void onNext(byte[] payload) {
try {
- MqttMessage message = new MqttMessage(converter.apply(payload));
+ MqttMessage message = new MqttMessage(payload);
this.client.publish(topic, message);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/reactortest/src/main/java/component/mqtt/MqttSource.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
similarity index 74%
rename from reactortest/src/main/java/component/mqtt/MqttSource.java
rename to rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
index 6e72acd..5b81963 100644
--- a/reactortest/src/main/java/component/mqtt/MqttSource.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
@@ -1,7 +1,6 @@
package component.mqtt;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -11,27 +10,25 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MqttSource <T> implements Publisher<T> {
- private Function<byte[], T> converter;
+public class MqttSource implements Publisher<byte[]> {
private MqttClient client;
private String topic;
- public MqttSource(MqttClient client, String topic, Function<byte[], T> converter) {
+ public MqttSource(MqttClient client, String topic) {
this.client = client;
this.topic = topic;
- this.converter = converter;
}
@Override
- public void subscribe(Subscriber<? super T> subscriber) {
+ public void subscribe(Subscriber<? super byte[]> subscriber) {
subscriber.onSubscribe(new MqttSubscription(subscriber));
}
public class MqttSubscription implements IMqttMessageListener, Subscription {
private AtomicBoolean subScribed;
- private Subscriber<? super T> subscriber;
+ private Subscriber<? super byte[]> subscriber;
- public MqttSubscription(Subscriber<? super T> subscriber) {
+ public MqttSubscription(Subscriber<? super byte[]> subscriber) {
this.subscriber = subscriber;
this.subScribed = new AtomicBoolean(false);
}
@@ -60,8 +57,7 @@
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
- T payLoad = converter.apply(message.getPayload());
- this.subscriber.onNext(payLoad);
+ this.subscriber.onNext(message.getPayload());
}
}
diff --git a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
new file mode 100644
index 0000000..32a1fe0
--- /dev/null
+++ b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
@@ -0,0 +1,43 @@
+package component.mqtt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.Assert;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+import reactor.core.publisher.Flux;
+
+public class Test1 {
+ Integer result = 0;
+
+ @Test
+ public void testMQtt() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId(), new MemoryPersistence());
+ client.connect();
+ MqttComponent mqtt = new MqttComponent();
+ mqtt.client = client;
+ Publisher<byte[]> fromTopic = mqtt.from("input");
+ Subscriber<byte[]> toTopic = mqtt.to("output");
+ Flux.from(fromTopic)
+ .log()
+ .subscribe(toTopic);
+
+ client.subscribe("output", (topic, message) -> {
+ result = new Integer(new String(message.getPayload()));
+ latch.countDown();
+ });
+ client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
+ client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
+ latch.await(100, TimeUnit.SECONDS);
+ Assert.assertEquals(2, result, 0.1);
+ client.disconnect();
+ client.close();
+ }
+}
diff --git a/reactortest/pom.xml b/reactortest/pom.xml
deleted file mode 100644
index 7029440..0000000
--- a/reactortest/pom.xml
+++ /dev/null
@@ -1,129 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>net.lr</groupId>
- <artifactId>reactortest</artifactId>
- <version>0.0.1-SNAPSHOT</version>
-
- <repositories>
- <repository>
- <id>spring-milestones</id>
- <url>http://repo.spring.io/milestone</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
- <build>
- <plugins>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <!-- http://maven.apache.org/plugins/maven-compiler-plugin/ -->
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>biz.aQute.bnd</groupId>
- <artifactId>bnd-maven-plugin</artifactId>
- <version>3.2.0</version>
- <executions>
- <execution>
- <goals>
- <goal>bnd-process</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.5</version>
- <configuration>
- <archive>
- <manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
- </archive>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>osgi.core</artifactId>
- <version>6.0.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>osgi.cmpn</artifactId>
- <version>6.0.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.addons</groupId>
- <artifactId>reactor-extra</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.addons</groupId>
- <artifactId>reactor-adapter</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.kafka</groupId>
- <artifactId>reactor-kafka</artifactId>
- <version>1.0.0.M2</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.1.1</version>
- </dependency>
- <dependency>
- <groupId>javax.mail</groupId>
- <artifactId>javax.mail-api</artifactId>
- <version>1.5.6</version>
- </dependency>
- <dependency>
- <groupId>javax.mail</groupId>
- <artifactId>mail</artifactId>
- <version>1.5.0-b01</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>1.7.7</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git a/reactortest/src/main/java/component/api/MComponent.java b/reactortest/src/main/java/component/api/MComponent.java
deleted file mode 100644
index 7f7027c..0000000
--- a/reactortest/src/main/java/component/api/MComponent.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package component.api;
-
-import java.util.function.Function;
-
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-public interface MComponent<F> {
- <T> Publisher<T> from(String destination, Function<F, T> converter) throws Exception;
- <T> Subscriber<T> to(String destination, Function<T, F> converter) throws Exception;
-}
diff --git a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java b/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
deleted file mode 100644
index c2ce58f..0000000
--- a/reactortest/src/main/java/component/eventadmin/EventAdminComponent.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package component.eventadmin;
-
-import java.util.Map;
-import java.util.function.Function;
-
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-import component.api.MComponent;
-
-@Component(property="name=eventAdmin")
-public class EventAdminComponent implements MComponent<Map<String, ?>> {
-
- private BundleContext context;
-
- @Reference
- EventAdmin client;
-
-
- @Activate
- public void activate(BundleContext context) throws MqttException {
- this.context = context;
- }
-
- @Deactivate
- public void deactivate() throws MqttException {
- }
-
- @Override
- public <T> Publisher<T> from(String topic, Function<Map<String, ?>, T> converter) throws Exception {
- return new EventAdminSource<T>(context, topic, converter);
- }
-
- @Override
- public <T> Subscriber<T> to(String topic, Function<T, Map<String, ?>> converter) throws Exception {
- return new EventAdminDestination<T>(client, topic, converter);
- }
-
-}
diff --git a/reactortest/src/main/java/component/mail/MailComponent.java b/reactortest/src/main/java/component/mail/MailComponent.java
deleted file mode 100644
index a932271..0000000
--- a/reactortest/src/main/java/component/mail/MailComponent.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package component.mail;
-
-import java.util.function.Function;
-
-import javax.mail.MethodNotSupportedException;
-import javax.mail.Session;
-import javax.mail.internet.MimeMessage;
-
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-import component.api.MComponent;
-
-@Component(property="name=mail")
-public class MailComponent implements MComponent<MimeMessage> {
-
- @Reference
- Session session;
-
- @Override
- public <T> Publisher<T> from(String topic, Function<MimeMessage, T> converter) throws Exception {
- throw new MethodNotSupportedException();
- }
-
- @Override
- public <T> Subscriber<T> to(String destination, Function<T, MimeMessage> converter) throws Exception {
- return new MailDestination<T>(destination, converter);
- }
-
-}
diff --git a/reactortest/src/main/java/reactortest/Map2Map.java b/reactortest/src/main/java/reactortest/Map2Map.java
deleted file mode 100644
index 3acc724..0000000
--- a/reactortest/src/main/java/reactortest/Map2Map.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package reactortest;
-
-import java.util.Map;
-
-public class Map2Map {
- public static Map<String, ?> convert(Map<String, ?> source) {
- return source;
- }
-}
diff --git a/reactortest/src/test/java/component/mqtt/Test1.java b/reactortest/src/test/java/component/mqtt/Test1.java
deleted file mode 100644
index 2c15d1f..0000000
--- a/reactortest/src/test/java/component/mqtt/Test1.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package component.mqtt;
-
-import static java.time.Duration.of;
-
-import java.time.temporal.ChronoUnit;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.junit.Assert;
-import org.junit.Test;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-import reactor.core.publisher.Flux;
-import reactor.math.MathFlux;
-import reactortest.ByteArrayConverter;
-import reactortest.DoubleConverter;
-
-public class Test1 {
- double result = 0;
-
- @Test
- public void testStream() {
- Flux<Integer> flux = Flux.fromArray(new Integer[]{1,10,5,3,4});
- MathFlux.averageDouble(flux).subscribe(System.out::println);
- }
-
- @Test
- public void testSlidingWindow() throws InterruptedException {
- Flux.interval(of(1, ChronoUnit.MILLIS))
- .transform(averageOfLastTwo())
- .subscribe(System.out::println);
- Thread.sleep(100);
- }
-
- private Function<Flux<Long>, Flux<Double>> averageOfLastTwo() {
- return f -> f.window(2, 1)
- .flatMap(win -> MathFlux.averageDouble(win));
- }
-
- @Test
- public void testMQtt() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- MqttClient client = new MqttClient("tcp://192.168.0.126:1883", MqttClient.generateClientId());
- client.connect();
- MqttComponent mqtt = new MqttComponent();
- mqtt.client = client;
- Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
- Subscriber<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
- Flux.from(fromTopic)
- .log()
- .window(2, 1)
- .flatMap(win -> MathFlux.averageDouble(win))
- .log()
- .subscribe(toTopic);
-
- client.subscribe("output", (topic, message) -> {
- result = ByteArrayConverter.asDouble(message.getPayload());
- latch.countDown();
- });
- client.publish("input", new MqttMessage(ByteArrayConverter.fromInteger(2)));
- client.publish("input", new MqttMessage(new Integer(2).toString().getBytes()));
- latch.await(10, TimeUnit.SECONDS);
- Assert.assertEquals(2, result, 0.1);
- client.disconnect();
- client.close();
- }
-}