blob: f1e0a2304542f95dcb0ef0e0b9d2b150a02c7cca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.mqtt;
import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
import org.apache.activemq.broker.BrokerService;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.mqtt.bolt.MqttBolt;
import org.apache.storm.mqtt.common.MqttOptions;
import org.apache.storm.mqtt.common.MqttPublisher;
import org.apache.storm.mqtt.mappers.StringMessageMapper;
import org.apache.storm.mqtt.spout.MqttSpout;
import org.apache.storm.testing.IntegrationTest;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.ITuple;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@IntegrationTest
public class StormMqttIntegrationTest implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class);
private static final String TEST_TOPIC = "/mqtt-topology";
private static final String RESULT_TOPIC = "/integration-result";
private static final String RESULT_PAYLOAD = "Storm MQTT Spout";
static boolean spoutActivated = false;
private static BrokerService broker;
@AfterAll
public static void cleanup() throws Exception {
broker.stop();
}
@BeforeAll
public static void start() throws Exception {
LOG.warn("Starting broker...");
broker = new BrokerService();
broker.addConnector("mqtt://localhost:1883");
broker.setDataDirectory("target");
broker.start();
LOG.debug("MQTT broker started");
}
@Test
public void testMqttTopology() throws Exception {
MQTT client = new MQTT();
client.setTracer(new MqttLogger());
URI uri = URI.create("tcp://localhost:1883");
client.setHost(uri);
client.setClientId("MQTTSubscriber");
client.setCleanSession(false);
BlockingConnection connection = client.blockingConnection();
connection.connect();
Topic[] topics = { new Topic("/integration-result", QoS.AT_LEAST_ONCE) };
byte[] qoses = connection.subscribe(topics);
try (LocalCluster cluster = new LocalCluster();
LocalTopology topo = cluster.submitTopology("test", new Config(), buildMqttTopology());) {
LOG.info("topology started");
while (!spoutActivated) {
Thread.sleep(500);
}
// publish a retained message to the broker
MqttOptions options = new MqttOptions();
options.setCleanConnection(false);
MqttPublisher publisher = new MqttPublisher(options, true);
publisher.connectMqtt("MqttPublisher");
publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
LOG.info("published message");
Message message = connection.receive();
LOG.info("Message recieved on topic: {}", message.getTopic());
LOG.info("Payload: {}", new String(message.getPayload()));
message.ack();
Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
}
}
public StormTopology buildMqttTopology() {
TopologyBuilder builder = new TopologyBuilder();
MqttOptions options = new MqttOptions();
options.setTopics(Arrays.asList(TEST_TOPIC));
options.setCleanConnection(false);
TestSpout spout = new TestSpout(new StringMessageMapper(), options);
MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
@Override
public MqttMessage toMessage(ITuple tuple) {
LOG.info("Received: {}", tuple);
return new MqttMessage(RESULT_TOPIC, RESULT_PAYLOAD.getBytes());
}
});
builder.setSpout("mqtt-spout", spout);
builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout");
return builder.createTopology();
}
public static class TestSpout extends MqttSpout {
public TestSpout(MqttMessageMapper type, MqttOptions options) {
super(type, options);
}
@Override
public void activate() {
super.activate();
LOG.info("Spout activated.");
spoutActivated = true;
}
}
}