Add embedded mqtt server for test
diff --git a/rcomp-mqtt/pom.xml b/rcomp-mqtt/pom.xml
index 3b6879e..89d7512 100644
--- a/rcomp-mqtt/pom.xml
+++ b/rcomp-mqtt/pom.xml
@@ -25,6 +25,7 @@
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>rcomp-mqtt</artifactId>
+
<dependencies>
<dependency>
<groupId>net.lr.reactive.component</groupId>
@@ -39,5 +40,18 @@
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>5.15.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <version>5.15.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/rcomp-mqtt/src/test/java/org/apache/karaf/rcomp/mqtt/Test1.java b/rcomp-mqtt/src/test/java/org/apache/karaf/rcomp/mqtt/MqttTest.java
similarity index 76%
rename from rcomp-mqtt/src/test/java/org/apache/karaf/rcomp/mqtt/Test1.java
rename to rcomp-mqtt/src/test/java/org/apache/karaf/rcomp/mqtt/MqttTest.java
index 45571dc..3b73a08 100644
--- a/rcomp-mqtt/src/test/java/org/apache/karaf/rcomp/mqtt/Test1.java
+++ b/rcomp-mqtt/src/test/java/org/apache/karaf/rcomp/mqtt/MqttTest.java
@@ -19,24 +19,36 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.karaf.rcomp.mqtt.MqttComponent;
+import org.apache.activemq.broker.BrokerService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
-public class Test1 {
+public class MqttTest {
+ private static final String MQTT_PORT = "9141";
Integer result = 0;
+ private BrokerService broker;
+
+ @Before
+ public void startBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.addConnector("mqtt://localhost:" + MQTT_PORT);
+ broker.start();
+ }
@Test
public void testMQtt() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- MqttClient client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId(), new MemoryPersistence());
+ MqttClient client = new MqttClient("tcp://localhost:" + MQTT_PORT, MqttClient.generateClientId(), new MemoryPersistence());
client.connect();
MqttComponent mqtt = new MqttComponent();
mqtt.client = client;
@@ -57,4 +69,9 @@
client.disconnect();
client.close();
}
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ }
}