blob: a83bcc5e64004d58b99e80e87457cc56da2fb4d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
public static int DISCONNECT_TIMEOUT = 5000;
protected ComponentLog logger;
protected IMqttClient mqttClient;
protected volatile String broker;
protected volatile String brokerUri;
protected volatile String clientID;
protected MqttConnectOptions connOpts;
protected MemoryPersistence persistence = new MemoryPersistence();
public ProcessSessionFactory processSessionFactory;
public static final Validator QOS_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
Integer inputInt = Integer.parseInt(input);
if (inputInt < 0 || inputInt > 2) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2").build();
}
return new ValidationResult.Builder().subject(subject).valid(true).build();
}
};
public static final Validator BROKER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
try{
URI brokerURI = new URI(input);
if (!"".equals(brokerURI.getPath())) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
}
if (!("tcp".equals(brokerURI.getScheme()) || "ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || "wss".equals(brokerURI.getScheme()))) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("only the 'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
}
} catch (URISyntaxException e) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
}
return new ValidationResult.Builder().subject(subject).valid(true).build();
}
};
public static final Validator RETAIN_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)){
return new ValidationResult.Builder().subject(subject).valid(true).build();
} else{
return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
.validate(subject, input, context);
}
}
};
public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder()
.name("Broker URI")
.description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context " +
"Service property must be set.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(BROKER_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder()
.name("Client ID")
.description("MQTT client ID to use. If not set, a UUID will be generated.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("Username to use when connecting to the broker")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password to use when connecting to the broker")
.sensitive(true)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder()
.name("Last Will Topic")
.description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_LAST_WILL_MESSAGE = new PropertyDescriptor.Builder()
.name("Last Will Message")
.description("The message to send as the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_LAST_WILL_RETAIN = new PropertyDescriptor.Builder()
.name("Last Will Retain")
.description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false)
.allowableValues("true","false")
.build();
public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder()
.name("Last Will QoS Level")
.description("QoS level to be used when publishing the Last Will Message")
.required(false)
.allowableValues(
ALLOWABLE_VALUE_QOS_0,
ALLOWABLE_VALUE_QOS_1,
ALLOWABLE_VALUE_QOS_2
)
.build();
public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder()
.name("Session state")
.description("Whether to start afresh or resume previous flows. See the allowable value descriptions for more details.")
.required(true)
.allowableValues(
ALLOWABLE_VALUE_CLEAN_SESSION_TRUE,
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE
)
.defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue())
.build();
public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder()
.name("MQTT Specification Version")
.description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.")
.allowableValues(
ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
ALLOWABLE_VALUE_MQTT_VERSION_311,
ALLOWABLE_VALUE_MQTT_VERSION_310
)
.defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
.required(true)
.build();
public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder()
.name("Connection Timeout (seconds)")
.description("Maximum time interval the client will wait for the network connection to the MQTT server " +
"to be established. The default timeout is 30 seconds. " +
"A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.")
.required(false)
.defaultValue("30")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_KEEP_ALIVE_INTERVAL = new PropertyDescriptor.Builder()
.name("Keep Alive Interval (seconds)")
.description("Defines the maximum time interval between messages sent or received. It enables the " +
"client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. " +
"The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, " +
"the client sends a very small \"ping\" message, which the server will acknowledge. A value of 0 disables keepalive processing in the client.")
.required(false)
.defaultValue("60")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(PROP_BROKER_URI);
descriptors.add(PROP_CLIENTID);
descriptors.add(PROP_USERNAME);
descriptors.add(PROP_PASSWORD);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(PROP_LAST_WILL_TOPIC);
descriptors.add(PROP_LAST_WILL_MESSAGE);
descriptors.add(PROP_LAST_WILL_RETAIN);
descriptors.add(PROP_LAST_WILL_QOS);
descriptors.add(PROP_CLEAN_SESSION);
descriptors.add(PROP_MQTT_VERSION);
descriptors.add(PROP_CONN_TIMEOUT);
descriptors.add(PROP_KEEP_ALIVE_INTERVAL);
return descriptors;
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
final boolean usernameSet = validationContext.getProperty(PROP_USERNAME).isSet();
final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet();
if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set").build());
}
final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet();
final boolean lastWillMessageSet = validationContext.getProperty(PROP_LAST_WILL_MESSAGE).isSet();
final boolean lastWillRetainSet = validationContext.getProperty(PROP_LAST_WILL_RETAIN).isSet();
final boolean lastWillQosSet = validationContext.getProperty(PROP_LAST_WILL_QOS).isSet();
// If any of the Last Will Properties are set
if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) {
// And any are not set
if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)){
// Then mark as invalid
results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " +
"set, all must be set.").build());
}
}
try {
URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue());
if (brokerURI.getScheme().equalsIgnoreCase("ssl") && !validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " + PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is used in " +
"the broker URI, the SSL Context Service must be set.").build());
}
} catch (URISyntaxException e) {
results.add(new ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false).explanation("it is not valid URI syntax.").build());
}
return results;
}
public static Properties transformSSLContextService(SSLContextService sslContextService){
Properties properties = new Properties();
if (sslContextService.getSslAlgorithm() != null) {
properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm());
}
if (sslContextService.getKeyStoreFile() != null) {
properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile());
}
if (sslContextService.getKeyStorePassword() != null) {
properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword());
}
if (sslContextService.getKeyStoreType() != null) {
properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType());
}
if (sslContextService.getTrustStoreFile() != null) {
properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile());
}
if (sslContextService.getTrustStorePassword() != null) {
properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword());
}
if (sslContextService.getTrustStoreType() != null) {
properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType());
}
return properties;
}
protected void onScheduled(final ProcessContext context){
broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
brokerUri = broker.endsWith("/") ? broker : broker + "/";
clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
if (clientID == null) {
clientID = UUID.randomUUID().toString();
}
connOpts = new MqttConnectOptions();
connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
if (sslProp.isSet()) {
Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
connOpts.setSSLProperties(sslProps);
}
PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
if (lastWillTopicProp.isSet()){
String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
}
PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
if(usernameProp.isSet()) {
connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
}
}
protected void onStopped() {
try {
logger.info("Disconnecting client");
mqttClient.disconnect(DISCONNECT_TIMEOUT);
} catch(MqttException me) {
logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me);
}
try {
logger.info("Closing client");
mqttClient.close();
mqttClient = null;
} catch (MqttException me) {
logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me);
}
}
protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
return new MqttClient(broker, clientID, persistence);
}
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
if (processSessionFactory == null) {
processSessionFactory = sessionFactory;
}
ProcessSession session = sessionFactory.createSession();
try {
onTrigger(context, session);
session.commit();
} catch (final Throwable t) {
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
session.rollback(true);
throw t;
}
}
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
protected boolean isConnected(){
return (mqttClient != null && mqttClient.isConnected());
}
}