blob: 57725b4a486728a18823c99576714988c7d55699 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.mqtt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.testing.IntegrationTest;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.ITuple;
import org.apache.activemq.broker.BrokerService;
import org.apache.storm.mqtt.bolt.MqttBolt;
import org.apache.storm.mqtt.common.MqttOptions;
import org.apache.storm.mqtt.common.MqttPublisher;
import org.apache.storm.mqtt.mappers.StringMessageMapper;
import org.apache.storm.mqtt.spout.MqttSpout;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
@Category(IntegrationTest.class)
public class StormMqttIntegrationTest implements Serializable{
private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class);
private static BrokerService broker;
static boolean spoutActivated = false;
private static final String TEST_TOPIC = "/mqtt-topology";
private static final String RESULT_TOPIC = "/integration-result";
private static final String RESULT_PAYLOAD = "Storm MQTT Spout";
public static class TestSpout extends MqttSpout{
public TestSpout(MqttMessageMapper type, MqttOptions options){
super(type, options);
}
@Override
public void activate() {
super.activate();
LOG.info("Spout activated.");
spoutActivated = true;
}
}
@AfterClass
public static void cleanup() throws Exception {
broker.stop();
}
@BeforeClass
public static void start() throws Exception {
LOG.warn("Starting broker...");
broker = new BrokerService();
broker.addConnector("mqtt://localhost:1883");
broker.setDataDirectory("target");
broker.start();
LOG.debug("MQTT broker started");
}
@Test
public void testMqttTopology() throws Exception {
MQTT client = new MQTT();
client.setTracer(new MqttLogger());
URI uri = URI.create("tcp://localhost:1883");
client.setHost(uri);
client.setClientId("MQTTSubscriber");
client.setCleanSession(false);
BlockingConnection connection = client.blockingConnection();
connection.connect();
Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", new Config(), buildMqttTopology());
LOG.info("topology started");
while(!spoutActivated) {
Thread.sleep(500);
}
// publish a retained message to the broker
MqttOptions options = new MqttOptions();
options.setCleanConnection(false);
MqttPublisher publisher = new MqttPublisher(options, true);
publisher.connectMqtt("MqttPublisher");
publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
LOG.info("published message");
Message message = connection.receive();
LOG.info("Message recieved on topic: {}", message.getTopic());
LOG.info("Payload: {}", new String(message.getPayload()));
message.ack();
Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
Assert.assertEquals(message.getTopic(), RESULT_TOPIC);
cluster.shutdown();
}
public StormTopology buildMqttTopology(){
TopologyBuilder builder = new TopologyBuilder();
MqttOptions options = new MqttOptions();
options.setTopics(Arrays.asList(TEST_TOPIC));
options.setCleanConnection(false);
TestSpout spout = new TestSpout(new StringMessageMapper(), options);
MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
@Override
public MqttMessage toMessage(ITuple tuple) {
LOG.info("Received: {}", tuple);
return new MqttMessage(RESULT_TOPIC, RESULT_PAYLOAD.getBytes());
}
});
builder.setSpout("mqtt-spout", spout);
builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout");
return builder.createTopology();
}
}