NIFI-8409 - ConsumeMQTT Processor Broker URI and Username Expression Language
This closes #4984
Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 3d34d94..a83bcc5 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -119,6 +119,7 @@
.description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context " +
"Service property must be set.")
.required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(BROKER_VALIDATOR)
.build();
@@ -135,6 +136,7 @@
.name("Username")
.description("Username to use when connecting to the broker")
.required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -275,7 +277,7 @@
}
try {
- URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).getValue());
+ URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue());
if (brokerURI.getScheme().equalsIgnoreCase("ssl") && !validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " + PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is used in " +
"the broker URI, the SSL Context Service must be set.").build());
@@ -314,7 +316,7 @@
}
protected void onScheduled(final ProcessContext context){
- broker = context.getProperty(PROP_BROKER_URI).getValue();
+ broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
brokerUri = broker.endsWith("/") ? broker : broker + "/";
clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
@@ -345,7 +347,7 @@
PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
if(usernameProp.isSet()) {
- connOpts.setUserName(usernameProp.getValue());
+ connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index d8c86d6..cf66d1a 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -82,7 +82,8 @@
public void testSSLContextServiceTruststoreOnly() throws InitializationException {
String brokerURI = "ssl://localhost:8883";
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
- runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, brokerURI);
+ runner.setVariable("brokerURI", brokerURI);
+ runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");