| <!DOCTYPE html> |
| <html lang="en"> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| <head> |
| <meta charset="utf-8"/> |
| <title>ExecuteStateless</title> |
| <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> |
| </head> |
| |
| <body> |
| |
| <h1>Introduction</h1> |
| <p> |
| With any sufficiently complex system, the designers and maintainers must make tradeoffs. Apache NiFi is no exception. |
| </p> |
| |
| <p> |
| NiFi is geared toward being run in an environment in which it is free to consume virtually all system resources, |
| especially CPU, disk, and network bandwidth. It is designed in such a way that data is pulled from a source system, optionally filtered, routed, |
| and transformed, before ultimately being published to zero or more destinations. Moreover, the architecture lends itself best to situations in |
| which the source and destinations of a particular piece of data (FlowFile) are themselves loosely coupled. |
| </p> |
| |
| <p> |
| As such, NiFi stores all FlowFile content on disk in order to be resilient across restarts. It provides backpressure so that data consumers |
| avoid overwhelming the system if the data publishers/producers are not able to keep up for some amount of time. It provides the ability to |
| assign more resources to individual parts of a dataflow (via the Concurrent Tasks configuration). |
| </p> |
| |
| <p> |
| All of these design decisions have served NiFi well, making it a leading platform for data integration. However, there are some use cases |
| which lend themselves better to a slightly different architecture than what is provided by traditional NiFi. |
| </p> |
| |
| <p> |
| For example, some use cases are better served by an architecture in which data is not persisted across restarts. Where, instead of storing the |
| data that has been received, the user knows that the data source is both persistent and replayable. In such a situation, it might make more sense |
| to avoid persisting the data and instead source the data anew after restart. This would provide an advantage in that data could potentially be stored |
| in memory instead of on disk, which can provide better performance. Additionally, it provides the ability to move the processing from one machine |
| to another machine without needing to worry about data loss. |
| </p> |
| |
| <p> |
| Stateless NiFi provides a different Runtime Engine than traditional NiFi. It is a single-threaded runtime engine, in which data is not persisted across |
| restarts. Additionally, the data that is sourced can be processed through the entire chain of processors in the dataflow before it is ever even |
| acknowledged from the source. The README document for NiFi Stateless provides far more context as to the differences between traditional NiFi |
| and Stateless NiFi, and you are encouraged to read through it in order to gain a better understanding of the different tradeoffs that were made for the |
| Stateless architecture. |
| </p> |
| |
| <p> |
| Both the traditional NiFi Runtime Engine and the Stateless NiFi Runtime Engine have their strengths and weaknesses. The ideal situation would be one |
| in which users had the ability to easily choose which parts of their dataflow run Stateless and which parts run in the traditional NiFi Runtime Engine. |
| </p> |
| |
| <p> |
| The ExecuteStateless Processor makes this possible. |
| </p> |
| |
| |
| |
| |
| <h1>Configuration</h1> |
| |
| <p> |
| In order to use the ExecuteStateless Processor, the most important configuration element is the flow definition. That is, where to find that dataflow |
| that is to be run using the Stateless Engine. |
| </p> |
| |
| <h2>Flow Definition</h2> |
| <p> |
| The Processor allows the dataflow to be stored in a local file (i.e., a file local to the NiFi server), |
| a URL that is accessible from the NiFi server, or in a NiFi Registry. Once the flow has been fetched, it is cached in the configured <code>Working Directory</code> |
| for resiliency purposes. If NiFi or the ExecuteStateless Processor is stopped and restarted, we do not want to add a single point of failure by relying |
| on some external service to be available. As a result, when the Processor is started, it will first attempt to fetch the flow from the configured location. |
| If unable to do so, it will load the dataflow from the cache, if it is available. |
| </p> |
| |
| |
| <h2>Ports</h2> |
| <p> |
| Depending on the dataflow that is to be run, it may obtain its data from some external source, such as a JMS Broker via the ConsumeJMS processor. |
| Or, it may take in data from another point in the NiFi flow. In order to do this, the dataflow must be created with an Input Port |
| at the root level of the dataflow. The ExecuteStateless processor is then able to be configured with an incoming connection. When the processor is |
| triggered to run, it will take one FlowFile from the incoming connection and enqueue it into the stateless dataflow for the configured Port. |
| If the Processor is configured to have an incoming Connection, the Input Port property must also be configured, unless there is exactly one |
| Input Port in the dataflow. |
| </p> |
| |
| <p> |
| Similarly, after completing its processing, the stateless flow may route data to one or more Output Ports. Data routed to these Output Ports |
| will then be transferred to the <code>output</code> relationship of the ExecuteStateless Processor. Any FlowFile routed to the <code>output</code> |
| relationship will also have an attribute added to it named "output.port.name" which can be used to route the data if necessary. |
| </p> |
| |
| <p> |
| It is a common practice, however, to have ports that use a naming convention such as "success" and "failure." It may not make sense to have the |
| dataflow take in a FlowFile into its Input Port, perform some processing, and route 1 FlowFile to "success" and route another to "failure" and then |
| to have all of the FlowFile transferred to the <code>output</code> relationship together. We are likely to want to consider the processing of the |
| incoming FlowFile to be a failure if any FlowFile makes its way to the "failure" port. In such a case, we want nothing to go to the "output" relationship, |
| and we want the incoming FlowFile instead to be routed to the "failure" relationship of ExecuteStateless. We can accomplish this by simplify providing |
| a comma-separated list of the Output Ports in the dataflow that should be considered a "failure." |
| </p> |
| |
| |
| |
| <h1>Success and Failure</h1> |
| |
| <p> |
| If the ExecuteStateless Processor is configured with an incoming connection, the data will be transferred to one of three relationships: |
| "original," "failure," or "timeout." |
| </p> |
| |
| <p> |
| When the dataflow is triggered, it will have up to the configured amount of time to complete its processing. This time period is configured via |
| the "Dataflow Timeout" property. If the dataflow has not completed in the allotted amount of time, the dataflow is canceled, and the input FlowFile |
| is routed to the "timeout" relationship. |
| </p> |
| |
| <p> |
| If any Processor within the dataflow throws an Exception that it does not handle, the dataflow is considered a failure, and the input FlowFile |
| will be routed to the "failure" relationship. |
| </p> |
| |
| <p> |
| Additionally, if any FlowFile is routed to one of the Ports whose name is defined in the "Failure Ports" property of ExecuteStateless, the dataflow |
| is considered a failure. In this case, an attribute named "failure.port.name" is added to the FlowFile, as there may be multiple ports that are |
| considered failures, and this can be used in order to differentiate between them. |
| </p> |
| |
| <p> |
| Otherwise, the incoming FlowFile will be routed to the "original" relationship, and any FlowFiles routed to any Output Port of the dataflow will be |
| transferred to the "output" relationship of the ExecuteStateless Processor. All FlowFiles transferred to the "output" relationship will also have an |
| attribute named "output.port.name." |
| </p> |
| |
| |
| <h1>Designing Flows for Stateless</h1> |
| |
| <p> |
| When designing a flow to use with Stateless, it is important to consider how the flow might want to receive its data and what it might want |
| to do with the data once it is processed. One option is for the flow to fully encapsulate the source of data and all destinations. For example, |
| it might have a ConsumeKafkaRecord processor, perform some processing, and then publish to another topic via PublishKafkaRecord. |
| </p> |
| |
| <p> |
| Another option would be to build a flow that source data from some external source, possibly perform some processing, but not define where the destination |
| of the data. For example, the flow might consist of a ConsumeKafkaRecord_2_6 processor and perform some filtering and transformation, but stop short |
| of publishing the data anywhere. Instead, it can transfer the data to an Output Port, which could then be used by ExecuteStateless to bring that |
| data into the NiFi dataflow. |
| </p> |
| |
| <p> |
| Similarly, a dataflow may not define where it receives its input from, and instead just use an Input Port, so that any dataflow can be built to source |
| data, and then deliver it to this dataflow, which is responsible for preparing and delivering the data. |
| </p> |
| |
| <p> |
| Finally, the dataflow may define neither the source nor the destination of the data. Instead, the dataflow will be built to use an |
| Input Port, it will perform some filtering/routing/transformation, and finally provide its processing results to an Output Port. |
| </p> |
| |
| |
| <h2>Input Ports</h2> |
| <p> |
| When designing a Stateless dataflow, it is recommended to use zero or one Input Port. It is possible, however, to define multiple Input Ports. |
| In this case, ExecuteStateless Processor needs to be configured by setting the Input Port property in order to dictate which of those Input Ports |
| the incoming data should be transferred to. Note that the property expects the NAME of the Port and not the identifier. It is also important to note |
| that the name is case sensitive. |
| </p> |
| |
| |
| <h2>Output Ports</h2> |
| <p> |
| While it is recommended not to use more than one Input Port, it often makes sense to make use of multiple Output Ports. For example, consider a dataflow |
| that takes in CSV data representing information about book sales. The dataflow then partitions the data into "large sales" and "small sales," performs |
| some enrichment, and converts the results into JSON. This dataflow might have four different output ports: "Input CSV," "Large Sales," "Small Sales," |
| and "Failure." |
| </p> |
| |
| |
| |
| <h2>Parameters</h2> |
| <p> |
| When we build a dataflow, it is often important that we not run the flow with the exact same configuration in every situation. For example, if we are |
| consuming from Kafka, we may want to parameterize the Kafka Brokers, and the name of the Topic. This is best done by making use of Parameters when |
| building the dataflow. |
| </p> |
| |
| <p> |
| Once some value has been parameterized, though, we must have some way of conveying values for those parameters to the ExecuteStateless Processor. |
| To do this, we use user-defined properties. When configuring the ExecuteStateless Processor, in the Properties tab, we can click the '+' icon in the |
| top-right. This allows us to add a custom property to the Processor. Whatever is used for the name and value of that property will be used as the name |
| and value of a parameter in the flow. |
| </p> |
| |
| <p> |
| For example, if our dataflow references a Parameter named "Kafka Topic" and we want to run our dataflow using a value of "book-sales," then we can add |
| a property to ExecuteStateless with the name "Kafka Topic" and the value "book-sales." |
| </p> |
| |
| |
| |
| <h1>Exposing the Dataflow</h1> |
| |
| <p> |
| Now that we've discussed some considerations for building our dataflow, we must consider how we can expose the dataflow, or make the dataflow available |
| to the ExecuteStateless processor. |
| </p> |
| |
| <p> |
| We have three options for this. Firstly, we can right-click on the Process Group that we want to expose, and choose to add the Process Group to Version Control |
| by adding it to the NiFi Registry. This is the recommended approach. |
| </p> |
| |
| <p> |
| However, we can also right-click on the Process Group and instead choose to "Download flow definition." At this point, we can copy the flow definition |
| JSON file to every node in our cluster. Or, alternatively, we can upload the flow definition to some location that is accessible via a URL from every |
| node in the cluster. For example, we might choose to check the JSON file into a Git repository and provide the URL to that file to the processor. |
| </p> |
| |
| <p> |
| It is worth noting that if we define the location of the dataflow to be some external URL or to live within the NiFi Registry, we don't want to add a |
| dependency on that external service to be available and accessible. As a result, when the dataflow is downloaded, it will be cached in the configured |
| <code>Working Directory</code> and if unable to access the flow at some later time, that cached version will be used. |
| </p> |
| |
| <p> |
| It is also important to note that when using an external URL or NiFi Registry, if the Processor is stopped and started (or NiFi is restarted), it is possible that |
| the dataflow could be different than the last time that it ran. Additionally, it's possible that two nodes in a cluster could potentially be running |
| a different version of the flow if they downloaded the file at different times (or if a different file were copied to the nodes). When using NiFi Registry, this can be |
| avoided by explicitly specifying the version of the flow to run. |
| </p> |
| |
| |
| |
| <h1>Surfacing NiFi Concepts</h1> |
| |
| <p> |
| Because this one processor is responsible for internally running an entire dataflow, there are several concepts that must be taken into consideration. |
| </p> |
| |
| <h2>Data Provenance</h2> |
| <p> |
| Throughout the course of a dataflow, many different intermediate FlowFiles may be created, destroyed, transformed, delivered, and fetched. |
| While traditional NiFi will emit Provenance events for each of these, it is not currently possible with the ExecuteStateless Processor. |
| Because those intermediate FlowFiles are not available, we cannot surface Provenance Events that are emitted by the dataflow, such as SEND |
| and RECEIVE events, because the FlowFiles that were sent and received are not available. |
| </p> |
| |
| <p> |
| Any FlowFile that is transferred to the "output" relationship will be shown as a CREATE Provenance event if there is no input FlowFile. |
| If there is an input FlowFile, those FlowFiles will be shown as FORK events, the child having forked from the incoming FlowFile. |
| </p> |
| |
| |
| <h2>Counters</h2> |
| |
| <p> |
| If any component within the stateless dataflow adjusts a counter, the counters that are adjusted are surfaced as counters for the ExecuteStateless |
| Processor. Consider a dataflow that takes in a single FlowFile and partitions it into two FlowFiles, which are then sent to different Output Ports. |
| Also consider that the partitioning is performed by a PartitionRecord processor with name PartitionData and ID 167ed9c3-a954-3dba-b6fd-c2e1a4572287. |
| Then, we may see a counter for the ExecuteStateless processor with a name "Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287)." |
| This is because the PartitionRecord Processor updates a counter with the name "Records Processed." The additional name and ID of the Processor |
| are added in order to give context. |
| </p> |
| |
| <p> |
| The above mentioned counter, though, will only be incremented for successful invocations of the dataflow. It may be helpful to understand how many |
| times the counter was updated for failed attempts, also. However, we don't want to combine the counters for successful and failed attempts, because |
| that can lead to confusion. Therefore, if the PartitionRecord processor is successful and updates the counter, but the dataflow fails (for example, |
| a FlowFile is then routed to a Failure Port), the ExecuteStateless processor will now have two counters: |
| "Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287)" and |
| "Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287) (Failed attempts)." |
| </p> |
| |
| |
| |
| <h2>Bulletins</h2> |
| <p> |
| We must also consider how bulletins from Processors within the stateless flow get surfaced to the ExecuteStateless processor. This can be helpful |
| for indicating that some concerning behavior is taking place. Any bulletin that is created while running the stateless flow that is at a level of |
| WARNING or ERROR will result in a bulletin being created by the ExecuteStateless Processor (assuming that the ExecuteStateless Processor's Bulletin |
| Level is set sufficiently high in its Settings tab). |
| </p> |
| |
| |
| <h2>Yielding</h2> |
| <p> |
| Similarly, if any Processor in the Stateless flow chooses to yield, the ExecuteStateless processor will yield. This is important if there are source |
| or destination systems that the Stateless flow is unable to communicate with or that have no more data to offer, as this allows us to avoid constantly |
| interacting with that external service, which could add significant load to it. |
| </p> |
| |
| |
| |
| |
| <h1>Performance Considerations</h1> |
| <p> |
| There are a few different performance-related considerations to take into effect when configuring the ExecuteStateless Processor. |
| </p> |
| |
| |
| <h2>Content Storage Strategy</h2> |
| <p> |
| One of the most impactful configuration options for the Processor is the configuration of the "Content Storage Strategy" property. For performance |
| reasons, the processor can be configured to hold all FlowFiles in memory. This includes incoming FlowFiles, as well as intermediate and output FlowFiles. |
| This can be a significant performance improvement but comes with a significant risk. The content is stored on NiFi's heap. This is the same heap that is shared |
| by all other ExecuteStateless flows and by NiFi's processors and the NiFi process itself. If the data is very large, it can quickly exhaust the heap, resulting |
| in Out Of Memory Errors in NiFi. These, in turn, can result in poor performance, as well as instability of the NiFi process itself. For this reason, it is not |
| recommended to use the "Store Content on Heap" option unless it is known that all FlowFiles will be small (less than a few MB). And in order to help safeguard |
| against the case that the Processor receives an unexpectedly large FlowFile, the "Max Input FlowFile Size" property must be configured when storing data on the heap. |
| </p> |
| |
| <p> |
| Alternatively, and by default, the "Content Storage Strategy" can be configured to store FlowFile content on disk. When this option is used, the content of all |
| FlowFiles is stored in the configured <code>Working Directory</code>. It is important to note, however, that this data is not meant to be persisted across restarts. |
| Instead, this simply provides the Stateless Engine with a way to avoid loading everything into memory. Upon restart, the data will be deleted instead of allowing |
| FlowFiles to resume from where they left off. |
| </p> |
| |
| |
| <h2>Concurrent Tasks</h2> |
| <p> |
| As noted before, the Stateless Engine is single-threaded. However, the processor does allow the user to configure more than one concurrent task. In this situation, |
| each thread/concurrent task will run its own instance of the dataflow. This functions in much the same way as if a single thread were run on each of many different computers. |
| Any internal state that is stored by the processor, such as the creation of a client for interacting with another service, is not shared. Additionally, if any Processors are |
| configured to run on Primary Node only, they will run for each instance. |
| </p> |
| |
| |
| <h2>Run Duration</h2> |
| <p> |
| This Processor supports the configuration of NiFi's Run Duration in the Scheduling tab. If the Processor is expected to process many small FlowFiles, it is recommended to configure |
| this option so that the Processor has a Run Duration of "25 ms." Typically, adjusting the value beyond that offers little benefit, but adjusting from "0 ms" to "25 ms" can |
| provide a very significant performance improvement for many dataflows, at the cost of potentially introducing up to 25 milliseconds worth of additional latency. |
| </p> |
| |
| </body> |
| </html> |