blob: 7afe22cfd862160b56ee68da0ef448fc85dc6938 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.mqtt;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.common.util.BaseOperator;
/**
*
*/
public class MqttOutputOperatorTest
{
private static final Logger logger = LoggerFactory.getLogger(MqttOutputOperatorTest.class);
private static final Map<String, String> sendingData = new HashMap<String, String>();
private static final Map<String, String> receivedData = new HashMap<String, String>();
static int sentTuples = 0;
final static int totalTuples = 9;
private static final class TestMqttOutputOperator extends AbstractSinglePortMqttOutputOperator<Map<String, String>>
{
@Override
public void processTuple(Map<String, String> tuple)
{
for (Map.Entry<String, String> entry : tuple.entrySet()) {
try {
connection.publish(entry.getKey(), entry.getValue().getBytes(), QoS.AT_LEAST_ONCE, false);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
public class GetDataThread extends Thread
{
@Override
public void run()
{
try {
int i = 0;
Topic[] topics = new Topic[sendingData.size()];
for (String key : sendingData.keySet()) {
topics[i++] = new Topic(key, QoS.AT_MOST_ONCE);
}
connection.subscribe(topics);
while (receivedData.size() < sendingData.size()) {
Message msg = connection.receive();
receivedData.put(msg.getTopic(), new String(msg.getPayload()));
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
}
public static class SourceModule extends BaseOperator
implements InputOperator, ActivationListener<OperatorContext>
{
public final transient DefaultOutputPort<Map<String, String>> outPort = new DefaultOutputPort<Map<String, String>>();
static transient ArrayBlockingQueue<Map<String, String>> holdingBuffer;
@Override
public void setup(OperatorContext context)
{
holdingBuffer = new ArrayBlockingQueue<Map<String, String>>(1024 * 1024);
}
public void emitTuple(Map<String, String> message)
{
outPort.emit(message);
}
@Override
public void emitTuples()
{
for (int i = holdingBuffer.size(); i-- > 0;) {
emitTuple(holdingBuffer.poll());
}
}
@Override
public void activate(OperatorContext ctx)
{
for (Map.Entry<String, String> e : sendingData.entrySet()) {
Map<String, String> map = new HashMap<String, String>();
map.put(e.getKey(), e.getValue());
holdingBuffer.add(map);
}
}
@Override
public void deactivate()
{
}
public void replayTuples(long windowId)
{
}
}
@Test
public void testDag() throws Exception
{
String host = "localhost";
int port = 1883;
MqttClientConfig config = new MqttClientConfig();
config.setHost(host);
config.setPort(port);
config.setCleanSession(true);
sendingData.put("testa", "2");
sendingData.put("testb", "20");
sendingData.put("testc", "1000");
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
SourceModule source = dag.addOperator("source", SourceModule.class);
TestMqttOutputOperator producer = dag.addOperator("producer", new TestMqttOutputOperator());
producer.setMqttClientConfig(config);
dag.addStream("Stream", source.outPort, producer.inputPort).setLocality(Locality.CONTAINER_LOCAL);
TestMqttOutputOperator.GetDataThread consumer = producer.new GetDataThread();
producer.setup(null);
consumer.start();
final LocalMode.Controller lc = lma.getController();
lc.runAsync();
try {
Thread.sleep(2000);
}
catch (InterruptedException ex) {
}
lc.shutdown();
Assert.assertEquals("emitted value for testNum was ", 3, receivedData.size());
for (Map.Entry<String, String> e : receivedData.entrySet()) {
if (e.getKey().equals("testa")) {
Assert.assertEquals("emitted value for 'testa' was ", "2", e.getValue());
}
else if (e.getKey().equals("testb")) {
Assert.assertEquals("emitted value for 'testb' was ", "20", e.getValue());
}
else if (e.getKey().equals("testc")) {
Assert.assertEquals("emitted value for 'testc' was ", "1000", e.getValue());
}
}
logger.debug("end of test");
}
}