blob: 8e9cb3774de0fcccaa712ea677b466866072f90f [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"></meta><title>ConsumeMQTT</title><link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"></link></head><script type="text/javascript">window.onload = function(){if(self==top) { document.getElementById('nameHeader').style.display = "inherit"; } }</script><body><h1 id="nameHeader" style="display: none;">ConsumeMQTT</h1><h2>Description: </h2><p>Subscribes to a topic and receives messages from an MQTT broker</p><p><a href="additionalDetails.html">Additional Details...</a></p><h3>Tags: </h3><p>subscribe, MQTT, IOT, consume, listen</p><h3>Properties: </h3><p>In the list below, the names of required properties appear in <strong>bold</strong>. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the <a href="../../../../../html/expression-language-guide.html">NiFi Expression Language</a>.</p><table id="properties"><tr><th>Display Name</th><th>API Name</th><th>Default Value</th><th>Allowable Values</th><th>Description</th></tr><tr><td id="name"><strong>Broker URI</strong></td><td>Broker URI</td><td></td><td id="allowable-values"></td><td id="description">The URI(s) to use to connect to the MQTT broker (e.g., tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context Service property must be set. When a comma-separated URI list is set (e.g., tcp://localhost:1883,tcp://localhost:1884), the processor will use a round-robin algorithm to connect to the brokers on connection failure.<br/><strong>Supports Expression Language: true (will be evaluated using variable registry only)</strong></td></tr><tr><td id="name"><strong>MQTT Specification Version</strong></td><td>MQTT Specification Version</td><td id="default-value">v3 AUTO</td><td id="allowable-values"><ul><li>v3 AUTO <img src="../../../../../html/images/iconInfo.png" alt="Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker" title="Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker"></img></li><li>v5.0</li><li>v3.1.1</li><li>v3.1.0</li></ul></td><td id="description">The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.</td></tr><tr><td id="name">Username</td><td>Username</td><td></td><td id="allowable-values"></td><td id="description">Username to use when connecting to the broker<br/><strong>Supports Expression Language: true (will be evaluated using variable registry only)</strong></td></tr><tr><td id="name">Password</td><td>Password</td><td></td><td id="allowable-values"></td><td id="description">Password to use when connecting to the broker<br/><strong>Sensitive Property: true</strong></td></tr><tr><td id="name">SSL Context Service</td><td>SSL Context Service</td><td></td><td id="allowable-values"><strong>Controller Service API: </strong><br/>SSLContextService<br/><strong>Implementations: </strong><a href="../../../nifi-ssl-context-service-nar/1.19.1/org.apache.nifi.ssl.StandardRestrictedSSLContextService/index.html">StandardRestrictedSSLContextService</a><br/><a href="../../../nifi-ssl-context-service-nar/1.19.1/org.apache.nifi.ssl.StandardSSLContextService/index.html">StandardSSLContextService</a></td><td id="description">The SSL Context Service used to provide client certificate information for TLS/SSL connections.</td></tr><tr><td id="name"><strong>Session state</strong></td><td>Session state</td><td id="default-value">Clean Session</td><td id="allowable-values"><ul><li>Clean Session <img src="../../../../../html/images/iconInfo.png" alt="Client and Server discard any previous session and start a new one. This session lasts as long as the network connection. State data associated with this session is not reused in any subsequent session" title="Client and Server discard any previous session and start a new one. This session lasts as long as the network connection. State data associated with this session is not reused in any subsequent session"></img></li><li>Resume Session <img src="../../../../../html/images/iconInfo.png" alt="Server resumes communications with the client based on state from the current session (as identified by the ClientID). The client and server store the session after the client and server are disconnected. After the disconnection of a session that was not a clean session, the server stores further QoS 1 and QoS 2 messages that match any subscriptions that the client had at the time of disconnection as part of the session state" title="Server resumes communications with the client based on state from the current session (as identified by the ClientID). The client and server store the session after the client and server are disconnected. After the disconnection of a session that was not a clean session, the server stores further QoS 1 and QoS 2 messages that match any subscriptions that the client had at the time of disconnection as part of the session state"></img></li></ul></td><td id="description">Whether to start a fresh or resume previous flows. See the allowable value descriptions for more details.</td></tr><tr><td id="name">Session Expiry Interval</td><td>Session Expiry Interval</td><td id="default-value">24 hrs</td><td id="allowable-values"></td><td id="description">After this interval the broker will expire the client and clear the session state.<br/><br/><strong>This Property is only considered if all of the following conditions are met:</strong><ul><li>The [MQTT Specification Version] Property has a value of "v5.0".</li><li>The [Session state] Property has a value of "Resume Session".</li></ul></td></tr><tr><td id="name">Client ID</td><td>Client ID</td><td></td><td id="allowable-values"></td><td id="description">MQTT client ID to use. If not set, a UUID will be generated.<br/><strong>Supports Expression Language: true (will be evaluated using variable registry only)</strong></td></tr><tr><td id="name">Group ID</td><td>Group ID</td><td></td><td id="allowable-values"></td><td id="description">MQTT consumer group ID to use. If group ID not set, client will connect as individual consumer.</td></tr><tr><td id="name"><strong>Topic Filter</strong></td><td>Topic Filter</td><td></td><td id="allowable-values"></td><td id="description">The MQTT topic filter to designate the topics to subscribe to.<br/><strong>Supports Expression Language: true (will be evaluated using variable registry only)</strong></td></tr><tr><td id="name"><strong>Quality of Service (QoS)</strong></td><td>Quality of Service(QoS)</td><td id="default-value">0 - At most once</td><td id="allowable-values"><ul><li>0 - At most once <img src="../../../../../html/images/iconInfo.png" alt="Best effort delivery. A message won’t be acknowledged by the receiver or stored and redelivered by the sender. This is often called “fire and forget” and provides the same guarantee as the underlying TCP protocol." title="Best effort delivery. A message won’t be acknowledged by the receiver or stored and redelivered by the sender. This is often called “fire and forget” and provides the same guarantee as the underlying TCP protocol."></img></li><li>1 - At least once <img src="../../../../../html/images/iconInfo.png" alt="Guarantees that a message will be delivered at least once to the receiver. The message can also be delivered more than once" title="Guarantees that a message will be delivered at least once to the receiver. The message can also be delivered more than once"></img></li><li>2 - Exactly once <img src="../../../../../html/images/iconInfo.png" alt="Guarantees that each message is received only once by the counterpart. It is the safest and also the slowest quality of service level. The guarantee is provided by two round-trip flows between sender and receiver." title="Guarantees that each message is received only once by the counterpart. It is the safest and also the slowest quality of service level. The guarantee is provided by two round-trip flows between sender and receiver."></img></li></ul></td><td id="description">The Quality of Service (QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.</td></tr><tr><td id="name">Record Reader</td><td>record-reader</td><td></td><td id="allowable-values"><strong>Controller Service API: </strong><br/>RecordReaderFactory<br/><strong>Implementations: </strong><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.grok.GrokReader/index.html">GrokReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.xml.XMLReader/index.html">XMLReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.avro.AvroReader/index.html">AvroReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.syslog.Syslog5424Reader/index.html">Syslog5424Reader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.cef.CEFReader/index.html">CEFReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.syslog.SyslogReader/index.html">SyslogReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.json.JsonTreeReader/index.html">JsonTreeReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.csv.CSVReader/index.html">CSVReader</a><br/><a href="../../../nifi-scripting-nar/1.19.1/org.apache.nifi.record.script.ScriptedReader/index.html">ScriptedReader</a><br/><a href="../../../nifi-parquet-nar/1.19.1/org.apache.nifi.parquet.ParquetReader/index.html">ParquetReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.json.JsonPathReader/index.html">JsonPathReader</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.lookup.ReaderLookup/index.html">ReaderLookup</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.windowsevent.WindowsEventLogReader/index.html">WindowsEventLogReader</a></td><td id="description">The Record Reader to use for parsing received MQTT Messages into Records.</td></tr><tr><td id="name">Record Writer</td><td>record-writer</td><td></td><td id="allowable-values"><strong>Controller Service API: </strong><br/>RecordSetWriterFactory<br/><strong>Implementations: </strong><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.text.FreeFormTextRecordSetWriter/index.html">FreeFormTextRecordSetWriter</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.avro.AvroRecordSetWriter/index.html">AvroRecordSetWriter</a><br/><a href="../../../nifi-scripting-nar/1.19.1/org.apache.nifi.record.script.ScriptedRecordSetWriter/index.html">ScriptedRecordSetWriter</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.csv.CSVRecordSetWriter/index.html">CSVRecordSetWriter</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.xml.XMLRecordSetWriter/index.html">XMLRecordSetWriter</a><br/><a href="../../../nifi-parquet-nar/1.19.1/org.apache.nifi.parquet.ParquetRecordSetWriter/index.html">ParquetRecordSetWriter</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.json.JsonRecordSetWriter/index.html">JsonRecordSetWriter</a><br/><a href="../../../nifi-record-serialization-services-nar/1.19.1/org.apache.nifi.lookup.RecordSetWriterLookup/index.html">RecordSetWriterLookup</a></td><td id="description">The Record Writer to use for serializing Records before writing them to a FlowFile.</td></tr><tr><td id="name"><strong>Add attributes as fields</strong></td><td>add-attributes-as-fields</td><td id="default-value">true</td><td id="allowable-values"><ul><li>true</li><li>false</li></ul></td><td id="description">If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.<br/><br/><strong>This Property is only considered if </strong><strong>the [Record Reader] Property has a value specified.</strong></td></tr><tr><td id="name">Message Demarcator</td><td>message-demarcator</td><td></td><td id="allowable-values"></td><td id="description">With this property, you have an option to output FlowFiles which contains multiple messages. This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple messages. This is an optional property ; if not provided, and if not defining a Record Reader/Writer, each message received will result in a single FlowFile. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.<br/><strong>Supports Expression Language: true (will be evaluated using variable registry only)</strong></td></tr><tr><td id="name">Connection Timeout (seconds)</td><td>Connection Timeout (seconds)</td><td id="default-value">30</td><td id="allowable-values"></td><td id="description">Maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.</td></tr><tr><td id="name">Keep Alive Interval (seconds)</td><td>Keep Alive Interval (seconds)</td><td id="default-value">60</td><td id="allowable-values"></td><td id="description">Defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, the client sends a very small "ping" message, which the server will acknowledge. A value of 0 disables keepalive processing in the client.</td></tr><tr><td id="name">Last Will Message</td><td>Last Will Message</td><td></td><td id="allowable-values"></td><td id="description">The message to send as the client's Last Will.</td></tr><tr><td id="name"><strong>Last Will Topic</strong></td><td>Last Will Topic</td><td></td><td id="allowable-values"></td><td id="description">The topic to send the client's Last Will to.<br/><br/><strong>This Property is only considered if </strong><strong>the [Last Will Message] Property has a value specified.</strong></td></tr><tr><td id="name"><strong>Last Will Retain</strong></td><td>Last Will Retain</td><td id="default-value">false</td><td id="allowable-values"><ul><li>true</li><li>false</li></ul></td><td id="description">Whether to retain the client's Last Will.<br/><br/><strong>This Property is only considered if </strong><strong>the [Last Will Message] Property has a value specified.</strong></td></tr><tr><td id="name"><strong>Last Will QoS Level</strong></td><td>Last Will QoS Level</td><td id="default-value">0 - At most once</td><td id="allowable-values"><ul><li>0 - At most once <img src="../../../../../html/images/iconInfo.png" alt="Best effort delivery. A message won’t be acknowledged by the receiver or stored and redelivered by the sender. This is often called “fire and forget” and provides the same guarantee as the underlying TCP protocol." title="Best effort delivery. A message won’t be acknowledged by the receiver or stored and redelivered by the sender. This is often called “fire and forget” and provides the same guarantee as the underlying TCP protocol."></img></li><li>1 - At least once <img src="../../../../../html/images/iconInfo.png" alt="Guarantees that a message will be delivered at least once to the receiver. The message can also be delivered more than once" title="Guarantees that a message will be delivered at least once to the receiver. The message can also be delivered more than once"></img></li><li>2 - Exactly once <img src="../../../../../html/images/iconInfo.png" alt="Guarantees that each message is received only once by the counterpart. It is the safest and also the slowest quality of service level. The guarantee is provided by two round-trip flows between sender and receiver." title="Guarantees that each message is received only once by the counterpart. It is the safest and also the slowest quality of service level. The guarantee is provided by two round-trip flows between sender and receiver."></img></li></ul></td><td id="description">QoS level to be used when publishing the Last Will Message.<br/><br/><strong>This Property is only considered if </strong><strong>the [Last Will Message] Property has a value specified.</strong></td></tr><tr><td id="name"><strong>Max Queue Size</strong></td><td>Max Queue Size</td><td></td><td id="allowable-values"></td><td id="description">The MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this processor, then a back up can occur in the internal queue of this processor. This property specifies the maximum number of messages this processor will hold in memory at one time in the internal queue. This data would be lost in case of a NiFi restart.</td></tr></table><h3>Relationships: </h3><table id="relationships"><tr><th>Name</th><th>Description</th></tr><tr><td>parse.failure</td><td>If a message cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.</td></tr><tr><td>Message</td><td>The MQTT message output</td></tr></table><h3>Reads Attributes: </h3>None specified.<h3>Writes Attributes: </h3><table id="writes-attributes"><tr><th>Name</th><th>Description</th></tr><tr><td>record.count</td><td>The number of records received</td></tr><tr><td>mqtt.broker</td><td>MQTT broker that was the message source</td></tr><tr><td>mqtt.topic</td><td>MQTT topic on which message was received</td></tr><tr><td>mqtt.qos</td><td>The quality of service for this message.</td></tr><tr><td>mqtt.isDuplicate</td><td>Whether or not this message might be a duplicate of one which has already been received.</td></tr><tr><td>mqtt.isRetained</td><td>Whether or not this message was from a current publisher, or was "retained" by the server as the last message published on the topic.</td></tr></table><h3>State management: </h3>This component does not store state.<h3>Restricted: </h3>This component is not restricted.<h3>Input requirement: </h3>This component does not allow an incoming relationship.<h3>System Resource Considerations:</h3><table id="system-resource-considerations"><tr><th>Resource</th><th>Description</th></tr><tr><td>MEMORY</td><td>The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single instance of this processor. A high value for this property could represent a lot of data being stored in memory.</td></tr></table><h3>See Also:</h3><p><a href="../org.apache.nifi.processors.mqtt.PublishMQTT/index.html">PublishMQTT</a></p></body></html>