Some fixes and added documentation
diff --git a/reactortest/README.md b/reactortest/README.md
new file mode 100644
index 0000000..eb2be07
--- /dev/null
+++ b/reactortest/README.md
@@ -0,0 +1,35 @@
+# Example for combining reactor and mqtt
+
+The example [MqttExampleComponent](src/main/java/reactortest/MqttExampleComponent.java) receives Integers from the topic "input",
+computes the average over a sliding window of 2 elements and writes the results to the topic "output".
+
+This example show how to combine reactor with an protocols in a loosely coupled way that does not strictly couple your user code to the protocol.
+
+## Build
+
+mvn clean install
+
+## Environment
+
+Install and start a MQTT server. I recommend using mosquitto.
+You also need a MQTT client.
+
+## Install
+
+config:property-set -p component.mqtt.MqttComponent serverUrl tcp://localhost:1883
+install -s mvn:org.eclipse.paho/org.eclipse.paho.client.mqttv3/1.1.1
+install -s mvn:org.reactivestreams/reactive-streams/1.0.0
+install -s mvn:io.projectreactor/reactor-core/3.0.7.RELEASE
+install -s wrap:mvn:io.projectreactor.addons/reactor-extra/3.0.7.RELEASE
+install -s mvn:net.lr/reactortest/0.0.1-SNAPSHOT
+
+## Test
+
+Start mqtt client
+
+Subscribe to topic "output".
+Send two messages containing the values "1", "2" and "3" to the topic "input".
+
+You should receive the following on the topic output: "1.5", "2.5"
+
+
diff --git a/reactortest/pom.xml b/reactortest/pom.xml
index a2a84bc..bd27829 100644
--- a/reactortest/pom.xml
+++ b/reactortest/pom.xml
@@ -5,6 +5,19 @@
<artifactId>reactortest</artifactId>
<version>0.0.1-SNAPSHOT</version>
+ <repositories>
+ <repository>
+ <id>spring-milestones</id>
+ <url>http://repo.spring.io/milestone</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
<build>
<plugins>
@@ -50,8 +63,8 @@
<version>6.0.0</version>
<scope>provided</scope>
</dependency>
-
-
+
+
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
@@ -68,6 +81,11 @@
<version>3.0.7.RELEASE</version>
</dependency>
<dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ <version>1.0.0.M2</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
diff --git a/reactortest/src/main/java/component/mqtt/MqttComponent.java b/reactortest/src/main/java/component/mqtt/MqttComponent.java
index ff5d229..62929e5 100644
--- a/reactortest/src/main/java/component/mqtt/MqttComponent.java
+++ b/reactortest/src/main/java/component/mqtt/MqttComponent.java
@@ -38,8 +38,7 @@
@Override
public <T> Publisher<T> from(String topic, Function<byte[], T> converter) throws Exception {
- MqttSource<T> source = new MqttSource<T>(client, topic, converter);
- return source;
+ return new MqttSource<T>(client, topic, converter);
}
@Override
diff --git a/reactortest/src/main/java/reactortest/MqttExampleComponent.java b/reactortest/src/main/java/reactortest/MqttExampleComponent.java
index 843e6fa..30081e3 100644
--- a/reactortest/src/main/java/reactortest/MqttExampleComponent.java
+++ b/reactortest/src/main/java/reactortest/MqttExampleComponent.java
@@ -6,6 +6,8 @@
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import component.api.MComponent;
import reactor.core.publisher.Flux;
@@ -13,17 +15,22 @@
@Component(immediate=true)
public class MqttExampleComponent {
+ Logger LOG = LoggerFactory.getLogger(MqttExampleComponent.class);
+
@Reference(target="(name=mqtt)")
MComponent mqtt;
@Activate
public void start() throws Exception {
+ LOG.info("Starting mqtt test component");
Publisher<Integer> fromTopic = mqtt.from("input", ByteArrayConverter::asInteger);
- Consumer<Double> toTopic = mqtt.to("test", DoubleConverter::asByteAr);
+ Consumer<Double> toTopic = mqtt.to("output", DoubleConverter::asByteAr);
Flux.from(fromTopic)
+ .log()
.window(2, 1)
.flatMap(win -> MathFlux.averageDouble(win))
.subscribe(toTopic);
+ LOG.info("mqtt test component started");
}
}