blob: 6a5b5f194735273b3f0bbf83130d09fc2b59fe6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.PublishMessage;
import io.moquette.server.Server;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public abstract class TestConsumeMqttCommon {
public int PUBLISH_WAIT_MS = 1000;
public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
public Server MQTT_server;
public TestRunner testRunner;
public String broker;
public abstract void internalPublish(PublishMessage publishMessage);
@Test
public void testClientIDConfiguration() {
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
runner.assertValid();
runner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
runner.assertNotValid();
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
runner.assertValid();
runner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
runner.assertValid();
}
@Test
public void testLastWillConfig() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
testRunner.assertValid();
}
@Test
public void testQoS2() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS2NotCleanSession() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1NotCleanSession() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS0() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() < 2);
assertProvenanceEvents(flowFiles.size());
if(flowFiles.size() == 1) {
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
}
@Test
public void testOnStoppedFinish() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
MqttMessage innerMessage = new MqttMessage();
innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
innerMessage.setQos(2);
MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
consumeMQTT.onStopped(testRunner.getProcessContext());
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testResizeBuffer() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
testRunner.assertValid();
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(false);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
internalPublish(testMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
testRunner.assertValid();
testRunner.run(1);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
assertProvenanceEvents(2);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testConsumeRecordsWithAddedFields() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(testMessage);
internalPublish(badMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() == 1);
assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+ "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
@Test
public void testConsumeDemarcator() throws Exception {
testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(testMessage);
internalPublish(badMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(flowFiles.size(), 1);
assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+ THIS_IS_NOT_JSON + "\\n"
+ "{\"name\":\"Apache NiFi\"}\\n",
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 0);
// clean runner by removing message demarcator
testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
}
@Test
public void testConsumeRecordsWithoutAddedFields() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(testMessage);
internalPublish(badMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() == 1);
assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
@Test
public void testConsumeRecordsOnlyBadData() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(badMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
f.setAccessible(true);
IMqttClient mqttClient = (IMqttClient) f.get(processor);
return mqttClient.isConnected();
}
public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
method.setAccessible(true);
method.invoke(processor, context);
}
public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
mqttQueueField.setAccessible(true);
return (BlockingQueue<MQTTQueueMessage>) mqttQueueField.get(consumeMQTT);
}
public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method transferQueue = ConsumeMQTT.class.getDeclaredMethod("transferQueue", ProcessSession.class);
transferQueue.setAccessible(true);
transferQueue.invoke(consumeMQTT, session);
}
private void assertProvenanceEvents(int count){
List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(count, provenanceEvents.size());
if (count > 0) {
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvents.get(0).getEventType());
}
}
}