| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| = MQTT Streamer |
| |
| == Overview |
| |
| This streamer consumes from an MQTT topic and feeds key-value pairs into an `IgniteDataStreamer` instance, using |
| https://eclipse.org/paho/[Eclipse Paho, window=_blank] as an MQTT client. |
| |
| You need to provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming |
| message and extract the tuple to insert. |
| |
| This streamer supports: |
| |
| * Subscribing to a single topic or multiple topics at once. |
| * Specifying the subscriber's QoS for a single topic or for multiple topics. |
| * Setting https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html[MqttConnectOptions, window=_blank] |
| to enable features like _last will testament_, _persistent sessions_, etc. |
| * Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one. |
| * (Re-)Connection retries powered by the https://github.com/rholder/guava-retrying[guava-retrying library, window=_blank]. |
| _Retry wait_ and _retry stop_ policies can be configured. |
| * Blocking the start() method until the client is connected for the first time. |
| |
| == Example |
| |
| Here's a trivial code sample showing how to use this streamer: |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source,java] |
| ---- |
| // Start Ignite. |
| Ignite ignite = Ignition.start(); |
| |
| // Get a data streamer reference. |
| IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache"); |
| |
| // Create an MQTT data streamer |
| MqttStreamer<Integer, String> streamer = new MqttStreamer<>(); |
| streamer.setIgnite(ignite); |
| streamer.setStreamer(dataStreamer); |
| streamer.setBrokerUrl(brokerUrl); |
| streamer.setBlockUntilConnected(true); |
| |
| // Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String |
| // (using Guava here). |
| streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() { |
| @Override public Map.Entry<Integer, String> extract(MqttMessage msg) { |
| List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload())); |
| |
| return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); |
| } |
| }); |
| |
| // Consume from multiple topics at once. |
| streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno")); |
| |
| // Start the MQTT Streamer. |
| streamer.start(); |
| ---- |
| -- |
| |
| Refer to the Javadocs of the `ignite-mqtt-ext` module for more info on the available options. |