blob: c6b35c0cf4faef8774f7cd8bfbe124eeb6fc7d76 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.mqtt;
import java.nio.charset.StandardCharsets;
import org.apache.edgent.connectors.mqtt.runtime.MqttConnector;
import org.apache.edgent.connectors.mqtt.runtime.MqttPublisher;
import org.apache.edgent.connectors.mqtt.runtime.MqttSubscriber;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.topology.TSink;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
/**
* {@code MqttStreams} is a connector to a MQTT messaging broker
* for publishing and subscribing to topics.
* <p>
* For more information about MQTT see <a href="http://mqtt.org">http://mqtt.org</a>
* <p>
* The connector exposes all MQTT capabilities:
* <ul>
* <li>multiple server URLs using tcp or ssl</li>
* <li>connection ClientId control</li>
* <li>connection username and password based authentication</li>
* <li>TODO SSL connection client authentication Q:expose just key/trust store path/pw or whole setSSLProperties capability? Related: expose setSocketFactory?</li>
* <li>connection clean session control</li>
* <li>connection keepalive control</li>
* <li>operation/action timeout control</li>
* <li>client last will and testament</li>
* <li>connection in-flight message persistence control</li>
* <li>per-published message control of topic, quality of service, payload and retain value</li>
* <li>subscription topicFilter and maximum quality of service control</li>
* <li>TODO multiple subscription topicFilters</li>
* <li>access to received message's topic and payload.
* Omitting until there's a clear demand for it: the received msg's isRetained, qos and isDuplicate values.
* <li>TODO dynamic server URLs control, operation timeout, keepalive control</li>
* <li>TODO dynamic subscription list control</li>
* <li>robust connection management / reconnected</li>
* <li>TODO fix: client's aren't gracefully disconnecting (their close() isn't getting called) issue#64</li>
* </ul>
* <p>
* Sample use:
* <pre>{@code
* Topology t = ...;
* String url = "tcp://localhost:1883";
* MqttStreams mqtt = new MqttStreams(t, url);
* TStream<String> s = top.constants("one", "two", "three");
* mqtt.publish(s, "myTopic", 0);
* TStream<String> rcvd = mqtt.subscribe("someTopic", 0);
* rcvd.print();
* }</pre>
* <P>
* {@code publish()} can be called zero or more times.
* {@code subscribe()} can be called at most once.
* <p>
* See {@link MqttConfig} for all configuration items.
* <p>
* Messages are delivered with the specified Quality of Service.
* TODO adjust the QoS-1 and 2 descriptions based on the fact that we only
* supply a MemoryPersistence class under the covers.
* <ul>
* <li>Quality of Service 0 - a message should be delivered at most once
* (zero or one times) - "fire and forget".
* This is the fastest QoS but should only be used for messages which
* are not valuable.</li>
* <li>Quality of Service 1 - a message should be delivered at least once
* (zero or more times).
* The message will be acknowledged across the network.</li>
* <li>Quality of Service 2 = a message should be delivered once.
* Delivery is subject to two-phase acknowledgment across the network.</li>
* </ul>
* For {@code subscribe()}, the QoS is the maximum QoS used for a message.
* If a message was published with a QoS less then the subscribe's QoS,
* the message will be received with the published QoS,
* otherwise it will be received with the subscribe's QoS.
*/
public class MqttStreams {
private final MqttConnector connector;
private final Topology topology;
private int subscribeCnt;
/**
* Create a connector to the specified server.
* <p>
* A convenience function.
* Connecting to the server occurs after the
* org.apache.edgent.org.apache.edgent.topology is submitted for execution.
*
* @param topology the connector's associated {@code Topology}.
* @param url URL of MQTT server.
* @param clientId the connector's MQTT clientId. auto-generated if null.
*/
public MqttStreams(Topology topology, String url, String clientId) {
this.topology = topology;
MqttConfig config = new MqttConfig();
config.setServerURLs(new String[] {url});
config.setClientId(clientId);
connector = new MqttConnector(() -> config);
}
/**
* Create a connector with the specified configuration.
* <p>
* Connecting to the server occurs after the
* org.apache.edgent.org.apache.edgent.topology is submitted for execution.
*
* @param topology the Topology to add to
* @param config {@link MqttConfig} supplier.
*/
public MqttStreams(Topology topology, Supplier<MqttConfig> config) {
this.topology = topology;
connector = new MqttConnector(config);
}
/**
* Publish a stream's tuples as MQTT messages.
* <p>Each tuple is published as an MQTT message with
* the supplied functions providing the message topic, payload
* and QoS. The topic and QoS can be generated based upon the tuple.
*
* @param <T> Tuple type
* @param stream Stream to be published.
* @param topic function to supply the message's topic.
* @param payload function to supply the message's payload.
* @param qos function to supply the message's delivery Quality of Service.
* @param retain function to supply the message's retain value
* @return TSink sink element representing termination of this stream.
*/
public <T> TSink<T> publish(TStream<T> stream, Function<T, String> topic, Function<T, byte[]> payload,
Function<T, Integer> qos, Function<T, Boolean> retain) {
return stream.sink(new MqttPublisher<T>(connector, payload, topic, qos, retain));
}
/**
* Publish a {@code TStream<String>} stream's tuples as MQTT messages.
* <p>
* A convenience function.
* The payload of each message is the String tuple's value serialized as UTF-8.
*
* @param stream Stream to be published.
* @param topic the fixed topic.
* @param qos the fixed delivery Quality of Service.
* @param retain the fixed retain value.
* @return TSink sink element representing termination of this stream.
*/
public TSink<String> publish(TStream<String> stream, String topic, int qos, boolean retain) {
return publish(stream, tuple -> topic, tuple -> tuple.getBytes(StandardCharsets.UTF_8), tuple -> qos, tuple -> retain);
}
/**
* Subscribe to the MQTT topic(s) and create a stream of tuples of type {@code T}.
* @param <T> Tuple type
* @param topicFilter the topic(s) to subscribe to.
* @param qos the maximum Quality of Service to use.
* @param message2Tuple function to convert {@code (topic, payload)} to
* a tuple of type {@code T}
* @return {@code TStream<T>}
*/
public <T> TStream<T> subscribe(String topicFilter, int qos, BiFunction<String, byte[], T> message2Tuple) {
addSubscribe();
return topology().events(new MqttSubscriber<T>(connector, topicFilter, qos, message2Tuple));
}
/**
* Subscribe to the MQTT topic(s) and create a {@code TStream<String>}.
* <p>
* A convenience function.
* Each message's payload is expected/required to be a UTF-8 encoded string.
* Only the converted payload is present the generated tuple.
*
* @param <T> Tuple type
* @param topicFilter the topic(s) to subscribe to.
* @param qos the maximum Quality of Service to use.
* @return {@code TStream<String>}
* @see #publish(TStream, String, int, boolean)
*/
public <T> TStream<String> subscribe(String topicFilter, int qos) {
addSubscribe();
return topology().events(new MqttSubscriber<String>(connector, topicFilter, qos,
(topic, payload) -> new String(payload, StandardCharsets.UTF_8)));
}
private void addSubscribe() {
// This is in recognition of our current code that doesn't support >1 subscribe().
// The issue is MqttClient only supports a single callback and right now
// each MqttSubscriber registers itself... so only the last one wins,
// and the last subscribe() rcvs all of the msgs and the others rcv none.
// Workaround: use multiple connectors.
if (++subscribeCnt > 1)
throw new IllegalStateException("An MqttStreams instance supports at most one subscribe()");
}
/**
* Get the {@link org.apache.edgent.topology.Topology} the connector is associated with.
* @return the value
*/
public Topology topology() {
return topology;
}
}