blob: cf66d1af3709bd44595dd1ef10e2a44248dfef11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import io.moquette.proto.messages.PublishMessage;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.MqttTestUtils;
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestConsumeMQTT extends TestConsumeMqttCommon {
public MqttTestClient mqttTestClient;
public class UnitTestableConsumeMqtt extends ConsumeMQTT {
public UnitTestableConsumeMqtt(){
super();
}
@Override
public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber);
return mqttTestClient;
}
}
@Before
public void init() throws IOException {
PUBLISH_WAIT_MS = 0;
broker = "tcp://localhost:1883";
UnitTestableConsumeMqtt proc = new UnitTestableConsumeMqtt();
testRunner = TestRunners.newTestRunner(proc);
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
}
@Test
public void testSSLContextServiceTruststoreOnly() throws InitializationException {
String brokerURI = "ssl://localhost:8883";
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
runner.setVariable("brokerURI", brokerURI);
runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
final StandardSSLContextService sslService = new StandardSSLContextService();
Map<String, String> sslProperties = MqttTestUtils.createSslPropertiesTruststoreOnly();
runner.addControllerService("ssl-context", sslService, sslProperties);
runner.enableControllerService(sslService);
runner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
try {
ConsumeMQTT processor = (ConsumeMQTT) runner.getProcessor();
processor.onScheduled(runner.getProcessContext());
} catch (Exception e) {
e.printStackTrace();
fail("Unexpected error");
}
}
/**
* If the session.commit() fails, we should not remove the unprocessed message
*/
@Test
public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
testRunner.run(1, false);
ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
MQTTQueueMessage mock = mock(MQTTQueueMessage.class);
when(mock.getPayload()).thenReturn(new byte[0]);
when(mock.getTopic()).thenReturn("testTopic");
BlockingQueue<MQTTQueueMessage> mqttQueue = getMqttQueue(processor);
mqttQueue.add(mock);
try {
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
transferQueue(processor,
(ProcessSession) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { ProcessSession.class }, (proxy, method, args) -> {
if (method.getName().equals("commit")) {
throw new RuntimeException();
} else {
return method.invoke(session, args);
}
}));
fail("Expected runtime exception");
} catch (InvocationTargetException e) {
assertTrue("Expected generic runtime exception, not " + e, e.getCause() instanceof RuntimeException);
}
assertTrue("Expected mqttQueue to contain uncommitted message.", mqttQueue.contains(mock));
}
@After
public void tearDown() throws Exception {
if (MQTT_server != null) {
MQTT_server.stopServer();
}
final File folder = new File("./target");
final File[] files = folder.listFiles( new FilenameFilter() {
@Override
public boolean accept( final File dir,
final String name ) {
return name.matches( "moquette_store.mapdb.*" );
}
} );
for ( final File file : files ) {
if ( !file.delete() ) {
System.err.println( "Can't remove " + file.getAbsolutePath() );
}
}
}
@Override
public void internalPublish(PublishMessage publishMessage) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(publishMessage.getPayload().array());
mqttMessage.setRetained(publishMessage.isRetainFlag());
mqttMessage.setQos(publishMessage.getQos().ordinal());
try {
mqttTestClient.publish(publishMessage.getTopicName(), mqttMessage);
} catch (MqttException e) {
fail("Should never get an MqttException when publishing using test client");
}
}
}