blob: 4880b5703d88395f4c732d83817a6482bb48f4c9 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.mqtt;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.topology.json.JsonFunctions;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.google.gson.JsonObject;
/**
* MQTT broker connector configuration.
*/
public class MqttConfig {
private MqttConnectOptions options = new MqttConnectOptions();
private String clientId;
private MqttClientPersistence persistence;
private long actionTimeToWaitMillis = -1;
private int idleTimeout;
private int subscriberIdleReconnectIntervalSec = 60;
/**
* Create a new configuration from {@link Properties}.
* <p>
* There is a property corresponding to each {@code MqttConfig.set<name>()}
* method. Unless otherwise stated, the property's value is a string
* of the corresponding method's argument type.
* Properties not specified yield a configuration value as
* described by and their corresponding {@code set<name>()}.
* <p>
* Properties other than those noted are ignored.
*
* <h3>MQTT connector properties</h3>
* <ul>
* <li>mqtt.actionTimeToWaitMillis</li>
* <li>mqtt.cleanSession</li>
* <li>mqtt.clientId</li>
* <li>mqtt.connectionTimeoutSec</li>
* <li>mqtt.idleTimeoutSec</li>
* <li>mqtt.keepAliveSec</li>
* <li>mqtt.keyStore - optional. Only used with "ssl:" serverURL when the
* server is configured for client auth.
* Path to key store file in JKS format.
* The first key in the store is used. The key must have the same
* password as the store if any.
* If not set, the standard JRE and javax.net.ssl system properties
* control the SSL behavior.</li>
* <li>mqtt.keyStorePassword - required if mqtt.keyStore is set.</li>
* <li>mqtt.password</li>
* <li>mqtt.persistence</li>
* <li>mqtt.serverURLs - csv list of MQTT URLs of the form:
* {@code tcp://<host>:<port>} or
* {@code ssl://<host>:<port>}
* </li>
* <li>mqtt.subscriberIdleReconnectIntervalSec</li>
* <li>mqtt.trustStore - optional. Only used with "ssl:" serverURL.
* Path to trust store file in JKS format.
* If not set, the standard JRE and javax.net.ssl system properties
* control the SSL behavior.
* Generally not required if server has a CA-signed certificate.</li>
* <li>mqtt.trustStorePassword - required if mqtt.trustStore is set</li>
* <li>mqtt.userName</li>
* <li>mqtt.will - JSON for with the following properties:
* <ul>
* <li>topic - string</li>
* <li>payload - string for byte[] in UTF8</li>
* <li>qos - integer</li>
* <li>retained - boolean</li>
* </ul>
* </li>
* </ul>
* @param properties properties specifying the configuration.
*
* @return the configuration
* @throws IllegalArgumentException for illegal values
*/
public static MqttConfig fromProperties(Properties properties) {
MqttConfig config = new MqttConfig();
Properties p = properties;
setConfig(p, "mqtt.actionTimeToWaitMillis",
val -> config.setActionTimeToWaitMillis(Long.valueOf(val)));
setConfig(p, "mqtt.cleanSession",
val -> config.setCleanSession(Boolean.valueOf(val)));
setConfig(p, "mqtt.clientId",
val -> config.setClientId(val));
setConfig(p, "mqtt.connectionTimeoutSec",
val -> config.setConnectionTimeout(Integer.valueOf(val)));
setConfig(p, "mqtt.idleTimeoutSec",
val -> config.setIdleTimeout(Integer.valueOf(val)));
setConfig(p, "mqtt.keepAliveSec",
val -> config.setKeepAliveInterval(Integer.valueOf(val)));
setConfig(p, "mqtt.keyStore",
val -> config.setKeyStore(val));
setConfig(p, "mqtt.keyStorePassword",
val -> config.setKeyStorePassword(val.toCharArray()));
// paho MqttConnectOptions.setSslProperties() doesn't support this control
// setConfig(p, "mqtt.keyPassword",
// val -> config.setKeyPassword(val.toCharArray()));
// setConfig(p, "mqtt.keyCertificateAlias",
// val -> config.setKeyCertificateAlias(val));
setConfig(p, "mqtt.password",
val -> config.setPassword(val.toCharArray()));
setConfig(p, "mqtt.persistence",
val -> config.setPersistence(newPersistenceProvider(val)));
setConfig(p, "mqtt.serverURLs",
val -> config.setServerURLs(val.trim().split(",")));
setConfig(p, "mqtt.subscriberIdleReconnectIntervalSec",
val -> config.setSubscriberIdleReconnectInterval(Integer.valueOf(val)));
setConfig(p, "mqtt.trustStore",
val -> config.setTrustStore(val));
setConfig(p, "mqtt.trustStorePassword",
val -> config.setTrustStorePassword(val.toCharArray()));
setConfig(p, "mqtt.userName",
val -> config.setUserName(val));
setConfig(p, "mqtt.will", val -> {
JsonObject jo = JsonFunctions.fromString().apply(val);
String topic = jo.get("topic").getAsString();
byte[] payload = jo.get("payload").getAsString().getBytes(StandardCharsets.UTF_8);
int qos = jo.get("qos").getAsInt();
boolean retained = jo.get("retained").getAsBoolean();
config.setWill(topic, payload, qos, retained);
});
return config;
}
private static void setConfig(Properties p, String name, Consumer<String> setter) {
try {
String value = p.getProperty(name);
if (value != null) {
setter.accept(value);
}
} catch (Exception e) {
throw new IllegalArgumentException(name, e);
}
}
private static MqttClientPersistence newPersistenceProvider(String className) {
Class<?> clazz = null;
try {
clazz = Class.forName(className);
return (MqttClientPersistence) clazz.newInstance();
}
catch (Exception e) {
throw new IllegalArgumentException(e.getLocalizedMessage(), e);
}
}
/**
* Create a new configuration.
*/
public MqttConfig() { }
/**
* Create a new configuration.
* @param serverURL the MQTT broker's URL
* @param clientId the MQTT client's id. Auto-generated if null.
*/
public MqttConfig(String serverURL, String clientId) {
options.setServerURIs(new String[] {serverURL});
this.clientId = clientId;
}
/**
* Get the connection Client Id.
* @return the value
*/
public String getClientId() {
return clientId;
}
/**
* Get the maximum time to wait for an action (e.g., publish message) to complete.
* @return the value
*/
public long getActionTimeToWaitMillis() {
return actionTimeToWaitMillis;
}
/**
* Get the QoS 1 and 2 in-flight message persistence handler.
* @return the value
*/
public MqttClientPersistence getPersistence() {
return persistence;
}
/**
* Get the connection timeout.
* @return the value
*/
public int getConnectionTimeout() {
return options.getConnectionTimeout();
}
/**
* Get the idle connection timeout.
* @return the value
*/
public int getIdleTimeout() {
return idleTimeout;
}
/**
* Get the subscriber idle reconnect interval.
* @return the value
*/
public int getSubscriberIdleReconnectInterval() {
return subscriberIdleReconnectIntervalSec;
}
/**
* Get the connection Keep alive interval.
* @return the value
*/
public int getKeepAliveInterval() {
return options.getKeepAliveInterval();
}
/**
* Get the MQTT Server URLs
* @return the value
*/
public String[] getServerURLs() {
return options.getServerURIs();
}
/**
* Get a Last Will and Testament message's destination topic.
* @return the value. may be null.
*/
public String getWillDestination() {
return options.getWillDestination();
}
/**
* Get a Last Will and Testament message's payload.
* @return the value. may be null.
*/
public byte[] getWillPayload() {
MqttMessage msg = options.getWillMessage();
return msg==null ? null : msg.getPayload();
}
/**
* Get a Last Will and Testament message's QOS.
* @return the value.
*/
public int getWillQOS() {
MqttMessage msg = options.getWillMessage();
return msg==null ? 0 : msg.getQos();
}
/**
* Get a Last Will and Testament message's "retained" setting.
* @return the value.
*/
public boolean getWillRetained() {
MqttMessage msg = options.getWillMessage();
return msg==null ? false : msg.isRetained();
}
/**
* Get the clean session setting.
* @return the value
*/
public boolean isCleanSession() {
return options.isCleanSession();
}
/**
* Get the the password to use for authentication with the server.
* @return the value
*/
public char[] getPassword() {
return options.getPassword();
}
/**
* Get the username to use for authentication with the server.
* @return the value
*/
public String getUserName() {
return options.getUserName();
}
/**
* Connection Client Id.
* <p>
* Optional. default null: a clientId is auto-generated.
* @param clientId the client id
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
* Maximum time to wait for an action (e.g., publish message) to complete.
* <p>
* Optional. default: -1 no timeout. 0 also means no timeout.
* @param actionTimeToWaitMillis the time to wait in milliseconds
*/
public void setActionTimeToWaitMillis(long actionTimeToWaitMillis) {
this.actionTimeToWaitMillis = actionTimeToWaitMillis;
}
/**
* QoS 1 and 2 in-flight message persistence.
* <p>
* optional. default: use memory persistence.
* @param persistence the persistence implementation
*/
public void setPersistence(MqttClientPersistence persistence) {
this.persistence = persistence;
}
/**
* Clean Session.
* <p>
* Qptional. default: true.
* @param cleanSession the clean session value
*/
public void setCleanSession(boolean cleanSession) {
options.setCleanSession(cleanSession);
}
/**
* Connection timeout.
* Optional. 0 disables the timeout / blocks until connected. default: 30 seconds.
* @param connectionTimeoutSec the timeout in seconds
*/
public void setConnectionTimeout(int connectionTimeoutSec) {
options.setConnectionTimeout(connectionTimeoutSec);
}
/**
* Idle connection timeout.
* Optional. 0 disables idle connection disconnect. default: 0 seconds (disabled).
* <p>
* Following an idle disconnect, the connector will automatically
* reconnect when it receives a new tuple to publish.
* If the connector is subscribing to topics, it will also reconnect
* as per {@link #setSubscriberIdleReconnectInterval(int)}.
* <p>
* @param idleTimeoutSec the timeout in seconds
* @see #setSubscriberIdleReconnectInterval(int)
*/
public void setIdleTimeout(int idleTimeoutSec) {
if (idleTimeoutSec < 0)
idleTimeoutSec = 0;
this.idleTimeout = idleTimeoutSec;
}
/**
* Subscriber idle reconnect interval.
* <p>
* Following an idle disconnect, if the connector is subscribing to topics,
* it will reconnect after the specified interval.
* Optional. default: 60 seconds.
* @param seconds the interval in seconds
*/
public void setSubscriberIdleReconnectInterval(int seconds) {
if (seconds < 0)
seconds = 0;
subscriberIdleReconnectIntervalSec = seconds;
}
/**
* Connection Keep alive.
* <p>
* Optional. 0 disables keepalive processing. default: 60 seconds.
* @param keepAliveSec the interval in seconds
*/
public void setKeepAliveInterval(int keepAliveSec) {
options.setKeepAliveInterval(keepAliveSec);
}
/**
* MQTT Server URLs
* <p>
* Required. Must be an array of one or more MQTT server URLs.
* When connecting, the first URL that successfully connects is used.
* @param serverUrls the URLs
*/
public void setServerURLs(String[] serverUrls) {
options.setServerURIs(serverUrls);
}
/**
* Last Will and Testament.
* <p>
* optional. default: no last-will-and-testament.
* @param topic topic to publish to
* @param payload the last-will-and-testament message value
* @param qos the quality of service to use to publish the message
* @param retained true to retain the message across connections
*/
public void setWill(String topic, byte[] payload, int qos, boolean retained) {
options.setWill(topic, payload, qos, retained);
}
/**
* Set the password to use for authentication with the server.
* Optional. default: null.
* @param password the password
*/
public void setPassword(char[] password) {
options.setPassword(password);
}
/**
* Set the username to use for authentication with the server.
* Optional. default: null.
* @param userName the user name
*/
public void setUserName(String userName) {
options.setUserName(userName);
}
/**
* @param name option name
* @param value option value. null to unset.
*/
private void setSslOption(String name, String value) {
Properties props = options.getSSLProperties();
if (props == null)
props = new Properties();
if (value == null)
props.remove(name);
else
props.setProperty(name, value);
options.setSSLProperties(props);
}
/**
* @param name option name
* @return option's value. null if not set.
*/
private String getSslOption(String name) {
Properties props = options.getSSLProperties();
return props == null ? null : props.getProperty(name);
}
/**
* Set the SSL trust store path.
* <p>
* Only used with "ssl:" serverURL.
* Path to trust store file in JKS format.
* If not set, the standard JRE and javax.net.ssl system properties
* control the SSL behavior.
* Generally not required if server has a CA-signed certificate.
* @param path the path. null to unset.
*/
public void setTrustStore(String path) {
setSslOption("com.ibm.ssl.trustStore", path);
}
/**
* Get the SSL trust store path.
* @return the path. null if not set.
*/
public String getTrustStore() {
return getSslOption("com.ibm.ssl.trustStore");
}
/**
* Set the SSL trust store password.
* <p>
* Required if the trust store path is set.
*
* @param password the password
*/
public void setTrustStorePassword(char[] password) {
setSslOption("com.ibm.ssl.trustStorePassword", new String(password));
}
/**
* Get the SSL trust store path password.
* @return the password. null if not set.
*/
public char[] getTrustStorePassword() {
String s = getSslOption("com.ibm.ssl.trustStorePassword");
return s == null ? null : s.toCharArray();
}
/**
* Set the SSL key store path.
* <p>
* Only used with "ssl:" serverURL when the server is configured for
* client auth.
* Path to trust store file in JKS format.
* If not set, the standard JRE and javax.net.ssl system properties
* control the SSL behavior.
* @param path the path. null to unset.
*/
public void setKeyStore(String path) {
setSslOption("com.ibm.ssl.keyStore", path);
}
/**
* Get the SSL trust store path.
* @return the path. null if not set.
*/
public String getKeyStore() {
return getSslOption("com.ibm.ssl.keyStore");
}
/**
* Set the SSL key store password.
* <p>
* Required if the key store path is set.
*
* @param password the password. null to unset.
*/
public void setKeyStorePassword(char[] password) {
setSslOption("com.ibm.ssl.keyStorePassword", new String(password));
}
/**
* Get the SSL key store path password.
* @return the password. null if not set.
*/
public char[] getKeyStorePassword() {
String s = getSslOption("com.ibm.ssl.keyStorePassword");
return s == null ? null : s.toCharArray();
}
// paho MqttConnectOptions.setSslProperties() doesn't support this control
// /**
// * Set the SSL key password.
// * <p>
// * Defaults to using the key store password if not set.
// *
// * @param password the password. null to unset.
// */
// public void setKeyPassword(char[] password) {
// setSslOption("com.ibm.ssl.XXXX-NO-SUCH-KEY-XXXX.keyPassword", new String(password));
// }
//
// public char[] getKeyPassword() {
// String s = getSslOption("com.ibm.ssl.XXXX-NO-SUCH-KEY-XXXX.keyPassword");
// return s == null ? null : s.toCharArray();
// }
// paho MqttConnectOptions.setSslProperties() doesn't support this control
// /**
// * Set the SSL key certificate alias.
// * <p>
// * Defaults to using "default" if not set.
// * @param alias the alias. null to unset.
// */
// public void setKeyCertificateAlias(String alias) {
// setSslOption("com.ibm.ssl.XXXX-NO-SUCH-KEY-XXXX.keyCertificateAlias", alias);
// }
//
// public String getKeyCertificateAlias() {
// return getSslOption("com.ibm.ssl.XXXX-NO-SUCH-KEY-XXXX.keyCertificateAlias");
// }
/**
* INTERNAL USE ONLY.
* @return object
*/
public Object options() {
return options;
}
}