blob: bc0e2eb1f77952d0067c3d7838532ccbe5e71cbd [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ExecuteStateless</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>Introduction</h1>
<p>
With any sufficiently complex system, the designers and maintainers must make tradeoffs. Apache NiFi is no exception.
</p>
<p>
NiFi is geared toward being run in an environment in which it is free to consume virtually all system resources,
especially CPU, disk, and network bandwidth. It is designed in such a way that data is pulled from a source system, optionally filtered, routed,
and transformed, before ultimately being published to zero or more destinations. Moreover, the architecture lends itself best to situations in
which the source and destinations of a particular piece of data (FlowFile) are themselves loosely coupled.
</p>
<p>
As such, NiFi stores all FlowFile content on disk in order to be resilient across restarts. It provides backpressure so that data consumers
avoid overwhelming the system if the data publishers/producers are not able to keep up for some amount of time. It provides the ability to
assign more resources to individual parts of a dataflow (via the Concurrent Tasks configuration).
</p>
<p>
All of these design decisions have served NiFi well, making it a leading platform for data integration. However, there are some use cases
which lend themselves better to a slightly different architecture than what is provided by traditional NiFi.
</p>
<p>
For example, some use cases are better served by an architecture in which data is not persisted across restarts. Where, instead of storing the
data that has been received, the user knows that the data source is both persistent and replayable. In such a situation, it might make more sense
to avoid persisting the data and instead source the data anew after restart. This would provide an advantage in that data could potentially be stored
in memory instead of on disk, which can provide better performance. Additionally, it provides the ability to move the processing from one machine
to another machine without needing to worry about data loss.
</p>
<p>
Stateless NiFi provides a different Runtime Engine than traditional NiFi. It is a single-threaded runtime engine, in which data is not persisted across
restarts. Additionally, the data that is sourced can be processed through the entire chain of processors in the dataflow before it is ever even
acknowledged from the source. The README document for NiFi Stateless provides far more context as to the differences between traditional NiFi
and Stateless NiFi, and you are encouraged to read through it in order to gain a better understanding of the different tradeoffs that were made for the
Stateless architecture.
</p>
<p>
Both the traditional NiFi Runtime Engine and the Stateless NiFi Runtime Engine have their strengths and weaknesses. The ideal situation would be one
in which users had the ability to easily choose which parts of their dataflow run Stateless and which parts run in the traditional NiFi Runtime Engine.
</p>
<p>
The ExecuteStateless Processor makes this possible.
</p>
<h1>Configuration</h1>
<p>
In order to use the ExecuteStateless Processor, the most important configuration element is the flow definition. That is, where to find that dataflow
that is to be run using the Stateless Engine.
</p>
<h2>Flow Definition</h2>
<p>
The Processor allows the dataflow to be stored in a local file (i.e., a file local to the NiFi server),
a URL that is accessible from the NiFi server, or in a NiFi Registry. Once the flow has been fetched, it is cached in the configured <code>Working Directory</code>
for resiliency purposes. If NiFi or the ExecuteStateless Processor is stopped and restarted, we do not want to add a single point of failure by relying
on some external service to be available. As a result, when the Processor is started, it will first attempt to fetch the flow from the configured location.
If unable to do so, it will load the dataflow from the cache, if it is available.
</p>
<h2>Ports</h2>
<p>
Depending on the dataflow that is to be run, it may obtain its data from some external source, such as a JMS Broker via the ConsumeJMS processor.
Or, it may take in data from another point in the NiFi flow. In order to do this, the dataflow must be created with an Input Port
at the root level of the dataflow. The ExecuteStateless processor is then able to be configured with an incoming connection. When the processor is
triggered to run, it will take one FlowFile from the incoming connection and enqueue it into the stateless dataflow for the configured Port.
If the Processor is configured to have an incoming Connection, the Input Port property must also be configured, unless there is exactly one
Input Port in the dataflow.
</p>
<p>
Similarly, after completing its processing, the stateless flow may route data to one or more Output Ports. Data routed to these Output Ports
will then be transferred to the <code>output</code> relationship of the ExecuteStateless Processor. Any FlowFile routed to the <code>output</code>
relationship will also have an attribute added to it named "output.port.name" which can be used to route the data if necessary.
</p>
<p>
It is a common practice, however, to have ports that use a naming convention such as "success" and "failure." It may not make sense to have the
dataflow take in a FlowFile into its Input Port, perform some processing, and route 1 FlowFile to "success" and route another to "failure" and then
to have all of the FlowFile transferred to the <code>output</code> relationship together. We are likely to want to consider the processing of the
incoming FlowFile to be a failure if any FlowFile makes its way to the "failure" port. In such a case, we want nothing to go to the "output" relationship,
and we want the incoming FlowFile instead to be routed to the "failure" relationship of ExecuteStateless. We can accomplish this by simplify providing
a comma-separated list of the Output Ports in the dataflow that should be considered a "failure."
</p>
<h1>Success and Failure</h1>
<p>
If the ExecuteStateless Processor is configured with an incoming connection, the data will be transferred to one of three relationships:
"original," "failure," or "timeout."
</p>
<p>
When the dataflow is triggered, it will have up to the configured amount of time to complete its processing. This time period is configured via
the "Dataflow Timeout" property. If the dataflow has not completed in the allotted amount of time, the dataflow is canceled, and the input FlowFile
is routed to the "timeout" relationship.
</p>
<p>
If any Processor within the dataflow throws an Exception that it does not handle, the dataflow is considered a failure, and the input FlowFile
will be routed to the "failure" relationship.
</p>
<p>
Additionally, if any FlowFile is routed to one of the Ports whose name is defined in the "Failure Ports" property of ExecuteStateless, the dataflow
is considered a failure. In this case, an attribute named "failure.port.name" is added to the FlowFile, as there may be multiple ports that are
considered failures, and this can be used in order to differentiate between them.
</p>
<p>
Otherwise, the incoming FlowFile will be routed to the "original" relationship, and any FlowFiles routed to any Output Port of the dataflow will be
transferred to the "output" relationship of the ExecuteStateless Processor. All FlowFiles transferred to the "output" relationship will also have an
attribute named "output.port.name."
</p>
<h1>Designing Flows for Stateless</h1>
<p>
When designing a flow to use with Stateless, it is important to consider how the flow might want to receive its data and what it might want
to do with the data once it is processed. One option is for the flow to fully encapsulate the source of data and all destinations. For example,
it might have a ConsumeKafkaRecord processor, perform some processing, and then publish to another topic via PublishKafkaRecord.
</p>
<p>
Another option would be to build a flow that source data from some external source, possibly perform some processing, but not define where the destination
of the data. For example, the flow might consist of a ConsumeKafkaRecord_2_6 processor and perform some filtering and transformation, but stop short
of publishing the data anywhere. Instead, it can transfer the data to an Output Port, which could then be used by ExecuteStateless to bring that
data into the NiFi dataflow.
</p>
<p>
Similarly, a dataflow may not define where it receives its input from, and instead just use an Input Port, so that any dataflow can be built to source
data, and then deliver it to this dataflow, which is responsible for preparing and delivering the data.
</p>
<p>
Finally, the dataflow may define neither the source nor the destination of the data. Instead, the dataflow will be built to use an
Input Port, it will perform some filtering/routing/transformation, and finally provide its processing results to an Output Port.
</p>
<h2>Input Ports</h2>
<p>
When designing a Stateless dataflow, it is recommended to use zero or one Input Port. It is possible, however, to define multiple Input Ports.
In this case, ExecuteStateless Processor needs to be configured by setting the Input Port property in order to dictate which of those Input Ports
the incoming data should be transferred to. Note that the property expects the NAME of the Port and not the identifier. It is also important to note
that the name is case sensitive.
</p>
<h2>Output Ports</h2>
<p>
While it is recommended not to use more than one Input Port, it often makes sense to make use of multiple Output Ports. For example, consider a dataflow
that takes in CSV data representing information about book sales. The dataflow then partitions the data into "large sales" and "small sales," performs
some enrichment, and converts the results into JSON. This dataflow might have four different output ports: "Input CSV," "Large Sales," "Small Sales,"
and "Failure."
</p>
<h2>Parameters</h2>
<p>
When we build a dataflow, it is often important that we not run the flow with the exact same configuration in every situation. For example, if we are
consuming from Kafka, we may want to parameterize the Kafka Brokers, and the name of the Topic. This is best done by making use of Parameters when
building the dataflow.
</p>
<p>
Once some value has been parameterized, though, we must have some way of conveying values for those parameters to the ExecuteStateless Processor.
To do this, we use user-defined properties. When configuring the ExecuteStateless Processor, in the Properties tab, we can click the '+' icon in the
top-right. This allows us to add a custom property to the Processor. Whatever is used for the name and value of that property will be used as the name
and value of a parameter in the flow.
</p>
<p>
For example, if our dataflow references a Parameter named "Kafka Topic" and we want to run our dataflow using a value of "book-sales," then we can add
a property to ExecuteStateless with the name "Kafka Topic" and the value "book-sales."
</p>
<h1>Exposing the Dataflow</h1>
<p>
Now that we've discussed some considerations for building our dataflow, we must consider how we can expose the dataflow, or make the dataflow available
to the ExecuteStateless processor.
</p>
<p>
We have three options for this. Firstly, we can right-click on the Process Group that we want to expose, and choose to add the Process Group to Version Control
by adding it to the NiFi Registry. This is the recommended approach.
</p>
<p>
However, we can also right-click on the Process Group and instead choose to "Download flow definition." At this point, we can copy the flow definition
JSON file to every node in our cluster. Or, alternatively, we can upload the flow definition to some location that is accessible via a URL from every
node in the cluster. For example, we might choose to check the JSON file into a Git repository and provide the URL to that file to the processor.
</p>
<p>
It is worth noting that if we define the location of the dataflow to be some external URL or to live within the NiFi Registry, we don't want to add a
dependency on that external service to be available and accessible. As a result, when the dataflow is downloaded, it will be cached in the configured
<code>Working Directory</code> and if unable to access the flow at some later time, that cached version will be used.
</p>
<p>
It is also important to note that when using an external URL or NiFi Registry, if the Processor is stopped and started (or NiFi is restarted), it is possible that
the dataflow could be different than the last time that it ran. Additionally, it's possible that two nodes in a cluster could potentially be running
a different version of the flow if they downloaded the file at different times (or if a different file were copied to the nodes). When using NiFi Registry, this can be
avoided by explicitly specifying the version of the flow to run.
</p>
<h1>Surfacing NiFi Concepts</h1>
<p>
Because this one processor is responsible for internally running an entire dataflow, there are several concepts that must be taken into consideration.
</p>
<h2>Data Provenance</h2>
<p>
Throughout the course of a dataflow, many different intermediate FlowFiles may be created, destroyed, transformed, delivered, and fetched.
While traditional NiFi will emit Provenance events for each of these, it is not currently possible with the ExecuteStateless Processor.
Because those intermediate FlowFiles are not available, we cannot surface Provenance Events that are emitted by the dataflow, such as SEND
and RECEIVE events, because the FlowFiles that were sent and received are not available.
</p>
<p>
Any FlowFile that is transferred to the "output" relationship will be shown as a CREATE Provenance event if there is no input FlowFile.
If there is an input FlowFile, those FlowFiles will be shown as FORK events, the child having forked from the incoming FlowFile.
</p>
<h2>Counters</h2>
<p>
If any component within the stateless dataflow adjusts a counter, the counters that are adjusted are surfaced as counters for the ExecuteStateless
Processor. Consider a dataflow that takes in a single FlowFile and partitions it into two FlowFiles, which are then sent to different Output Ports.
Also consider that the partitioning is performed by a PartitionRecord processor with name PartitionData and ID 167ed9c3-a954-3dba-b6fd-c2e1a4572287.
Then, we may see a counter for the ExecuteStateless processor with a name "Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287)."
This is because the PartitionRecord Processor updates a counter with the name "Records Processed." The additional name and ID of the Processor
are added in order to give context.
</p>
<p>
The above mentioned counter, though, will only be incremented for successful invocations of the dataflow. It may be helpful to understand how many
times the counter was updated for failed attempts, also. However, we don't want to combine the counters for successful and failed attempts, because
that can lead to confusion. Therefore, if the PartitionRecord processor is successful and updates the counter, but the dataflow fails (for example,
a FlowFile is then routed to a Failure Port), the ExecuteStateless processor will now have two counters:
"Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287)" and
"Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287) (Failed attempts)."
</p>
<h2>Bulletins</h2>
<p>
We must also consider how bulletins from Processors within the stateless flow get surfaced to the ExecuteStateless processor. This can be helpful
for indicating that some concerning behavior is taking place. Any bulletin that is created while running the stateless flow that is at a level of
WARNING or ERROR will result in a bulletin being created by the ExecuteStateless Processor (assuming that the ExecuteStateless Processor's Bulletin
Level is set sufficiently high in its Settings tab).
</p>
<h2>Yielding</h2>
<p>
Similarly, if any Processor in the Stateless flow chooses to yield, the ExecuteStateless processor will yield. This is important if there are source
or destination systems that the Stateless flow is unable to communicate with or that have no more data to offer, as this allows us to avoid constantly
interacting with that external service, which could add significant load to it.
</p>
<h1>Performance Considerations</h1>
<p>
There are a few different performance-related considerations to take into effect when configuring the ExecuteStateless Processor.
</p>
<h2>Content Storage Strategy</h2>
<p>
One of the most impactful configuration options for the Processor is the configuration of the "Content Storage Strategy" property. For performance
reasons, the processor can be configured to hold all FlowFiles in memory. This includes incoming FlowFiles, as well as intermediate and output FlowFiles.
This can be a significant performance improvement but comes with a significant risk. The content is stored on NiFi's heap. This is the same heap that is shared
by all other ExecuteStateless flows and by NiFi's processors and the NiFi process itself. If the data is very large, it can quickly exhaust the heap, resulting
in Out Of Memory Errors in NiFi. These, in turn, can result in poor performance, as well as instability of the NiFi process itself. For this reason, it is not
recommended to use the "Store Content on Heap" option unless it is known that all FlowFiles will be small (less than a few MB). And in order to help safeguard
against the case that the Processor receives an unexpectedly large FlowFile, the "Max Input FlowFile Size" property must be configured when storing data on the heap.
</p>
<p>
Alternatively, and by default, the "Content Storage Strategy" can be configured to store FlowFile content on disk. When this option is used, the content of all
FlowFiles is stored in the configured <code>Working Directory</code>. It is important to note, however, that this data is not meant to be persisted across restarts.
Instead, this simply provides the Stateless Engine with a way to avoid loading everything into memory. Upon restart, the data will be deleted instead of allowing
FlowFiles to resume from where they left off.
</p>
<h2>Concurrent Tasks</h2>
<p>
As noted before, the Stateless Engine is single-threaded. However, the processor does allow the user to configure more than one concurrent task. In this situation,
each thread/concurrent task will run its own instance of the dataflow. This functions in much the same way as if a single thread were run on each of many different computers.
Any internal state that is stored by the processor, such as the creation of a client for interacting with another service, is not shared. Additionally, if any Processors are
configured to run on Primary Node only, they will run for each instance.
</p>
<h2>Run Duration</h2>
<p>
This Processor supports the configuration of NiFi's Run Duration in the Scheduling tab. If the Processor is expected to process many small FlowFiles, it is recommended to configure
this option so that the Processor has a Run Duration of "25 ms." Typically, adjusting the value beyond that offers little benefit, but adjusting from "0 ms" to "25 ms" can
provide a very significant performance improvement for many dataflows, at the cost of potentially introducing up to 25 milliseconds worth of additional latency.
</p>
</body>
</html>