NIFI-8409 - ConsumeMQTT Processor Broker URI and Username Expression Language

This closes #4984

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 3d34d94..a83bcc5 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -119,6 +119,7 @@
             .description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context " +
                     "Service property must be set.")
             .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(BROKER_VALIDATOR)
             .build();
 
@@ -135,6 +136,7 @@
             .name("Username")
             .description("Username to use when connecting to the broker")
             .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -275,7 +277,7 @@
         }
 
         try {
-            URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).getValue());
+            URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue());
             if (brokerURI.getScheme().equalsIgnoreCase("ssl") && !validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
                 results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " + PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is used in " +
                         "the broker URI, the SSL Context Service must be set.").build());
@@ -314,7 +316,7 @@
     }
 
     protected void onScheduled(final ProcessContext context){
-        broker = context.getProperty(PROP_BROKER_URI).getValue();
+        broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
         brokerUri = broker.endsWith("/") ? broker : broker + "/";
         clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
 
@@ -345,7 +347,7 @@
 
         PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
         if(usernameProp.isSet()) {
-            connOpts.setUserName(usernameProp.getValue());
+            connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
             connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
         }
     }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index d8c86d6..cf66d1a 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -82,7 +82,8 @@
     public void testSSLContextServiceTruststoreOnly() throws InitializationException {
         String brokerURI = "ssl://localhost:8883";
         TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
-        runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, brokerURI);
+        runner.setVariable("brokerURI", brokerURI);
+        runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
         runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
         runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
         runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");