blob: e4375146a61505f2d2af29e67d2e723b28e7258a [file] [view]
---
title: "Stateful processing with Apache Beam"
date: 2017-02-13 00:00:01 -0800
categories:
- blog
aliases:
- /blog/2017/02/13/stateful-processing.html
authors:
- klk
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
Beam lets you process unbounded, out-of-order, global-scale data with portable
high-level pipelines. Stateful processing is a new feature of the Beam model
that expands the capabilities of Beam, unlocking new use cases and new
efficiencies. In this post, I will guide you through stateful processing in
Beam: how it works, how it fits in with the other features of the Beam model,
what you might use it for, and what it looks like in code.
**Note: This post has been updated in May of 2019, to include Python
snippets!**
<!--more-->
> **Warning: new features ahead!**: This is a very new aspect of the Beam
> model. Runners are still adding support. You can try it out today on multiple
> runners, but do check the [runner capability
> matrix](/documentation/runners/capability-matrix/) for
> the current status in each runner.
First, a quick recap: In Beam, a big data processing _pipeline_ is a directed,
acyclic graph of parallel operations called _`PTransforms`_ processing data
from _`PCollections`_. I'll expand on that by walking through this illustration:
<img class="center-block"
src="/images/blog/stateful-processing/pipeline.png"
alt="A Beam Pipeline - PTransforms are boxes - PCollections are arrows"
width="300">
The boxes are `PTransforms` and the edges represent the data in `PCollections`
flowing from one `PTransform` to the next. A `PCollection` may be _bounded_ (which
means it is finite and you know it) or _unbounded_ (which means you don't know if
it is finite or not - basically, it is like an incoming stream of data that may
or may not ever terminate). The cylinders are the data sources and sinks at the
edges of your pipeline, such as bounded collections of log files or unbounded
data streaming over a Kafka topic. This blog post isn't about sources or sinks,
but about what happens in between - your data processing.
There are two main building blocks for processing your data in Beam: _`ParDo`_,
for performing an operation in parallel across all elements, and _`GroupByKey`_
(and the closely related `CombinePerKey` that I will talk about quite soon)
for aggregating elements to which you have assigned the same key. In the
picture below (featured in many of our presentations) the color indicates the
key of the element. Thus the `GroupByKey`/`CombinePerKey` transform gathers all the
green squares to produce a single output element.
<img class="center-block"
src="/images/blog/stateful-processing/pardo-and-gbk.png"
alt="ParDo and GroupByKey/CombinePerKey:
Elementwise versus aggregating computations"
width="400">
But not all use cases are easily expressed as pipelines of simple `ParDo`/`Map` and
`GroupByKey`/`CombinePerKey` transforms. The topic of this blog post is a new
extension to the Beam programming model: **per-element operation augmented with
mutable state**.
<img class="center-block"
src="/images/blog/stateful-processing/stateful-pardo.png"
alt="Stateful ParDo - sequential per-key processing with persistent state"
width="300">
In the illustration above, ParDo now has a bit of durable, consistent state on
the side, which can be read and written during the processing of each element.
The state is partitioned by key, so it is drawn as having disjoint sections for
each color. It is also partitioned per window, but I thought plaid
<img src="/images/blog/stateful-processing/plaid.png"
alt="A plaid storage cylinder" width="20">
would be a bit much :-). I'll talk about
why state is partitioned this way a bit later, via my first example.
For the rest of this post, I will describe this new feature of Beam in detail -
how it works at a high level, how it differs from existing features, how to
make sure it is still massively scalable. After that introduction at the model
level, I'll walk through a simple example of how you use it in the Beam Java
SDK.
## How does stateful processing in Beam work?
The processing logic of your `ParDo` transform is expressed through the `DoFn`
that it applies to each element. Without stateful augmentations, a `DoFn` is a
mostly-pure function from inputs to one or more outputs, corresponding to the
Mapper in a MapReduce. With state, a `DoFn` has the ability to access
persistent mutable state while processing each input element. Consider this
illustration:
<img class="center-block"
src="/images/blog/stateful-processing/stateful-dofn.png"
alt="Stateful DoFn -
the runner controls input but the DoFn controls storage and output"
width="300">
The first thing to note is that all the data - the little squares, circles, and
triangles - are red. This is to illustrate that stateful processing occurs in
the context of a single key - all of the elements are key-value pairs with the
same key. Calls from your chosen Beam runner to the `DoFn` are colored in
yellow, while calls from the `DoFn` to the runner are in purple:
- The runner invokes the `DoFn`'s `@ProcessElement` method on each element for a
key+window.
- The `DoFn` reads and writes state - the curved arrows to/from the storage on
the side.
- The `DoFn` emits output (or side output) to the runner as usual via
`ProcessContext.output` (resp. `ProcessContext.sideOutput`).
At this very high level, it is pretty intuitive: In your programming
experience, you have probably at some point written a loop over elements that
updates some mutable variables while performing other actions. The interesting
question is how does this fit into the Beam model: how does it relate with
other features? How does it scale, since state implies some synchronization?
When should it be used versus other features?
## How does stateful processing fit into the Beam model?
To see where stateful processing fits in the Beam model, consider another
way that you can keep some "state" while processing many elements: CombineFn. In
Beam, you can write `Combine.perKey(CombineFn)` in Java or Python to apply an
associative, commutative accumulating operation across all the elements with a
common key (and window).
Here is a diagram illustrating the basics of a `CombineFn`, the simplest way
that a runner might invoke it on a per-key basis to build an accumulator and
extract an output from the final accumulator:
<img class="center-block"
src="/images/blog/stateful-processing/combinefn.png"
alt="CombineFn - the runner controls input, storage, and output"
width="300">
As with the illustration of stateful `DoFn`, all the data is colored red, since
this is the processing of Combine for a single key. The illustrated method
calls are colored yellow, since they are all controlled by the runner: The
runner invokes `addInput` on each method to add it to the current accumulator.
- The runner persists the accumulator when it chooses.
- The runner calls `extractOutput` when ready to emit an output element.
At this point, the diagram for `CombineFn` looks a whole lot like the diagram
for stateful `DoFn`. In practice, the flow of data is, indeed, quite similar.
But there are important differences, even so:
- The runner controls all invocations and storage here. You do not decide when
or how state is persisted, when an accumulator is discarded (based on
triggering) or when output is extracted from an accumulator.
- You can only have one piece of state - the accumulator. In a stateful DoFn
you can read only what you need to know and write only what has changed.
- You don't have the extended features of `DoFn`, such as multiple outputs per
input or side outputs. (These could be simulated by a sufficient complex
accumulator, but it would not be natural or efficient. Some other features of
`DoFn` such as side inputs and access to the window make perfect sense for
`CombineFn`)
But the main thing that `CombineFn` allows a runner to do is to
`mergeAccumulators`, the concrete expression of the `CombineFn`'s associativity.
This unlocks some huge optimizations: the runner can invoke multiple instances
of a `CombineFn` on a number of inputs and later combine them in a classic
divide-and-conquer architecture, as in this picture:
<img class="center-block"
src="/images/blog/stateful-processing/combiner-lifting.png"
alt="Divide-and-conquer aggregation with a CombineFn"
width="600">
The contract of a `CombineFn` is that the result should be exactly the same,
whether or not the runner decides to actually do such a thing, or even more
complex trees with hot-key fanout, etc.
This merge operation is not (necessarily) provided by a stateful `DoFn`: the
runner cannot freely branch its execution and recombine the states. Note that
the input elements are still received in an arbitrary order, so the `DoFn` should
be insensitive to ordering and bundling but it doesn't mean the output must be
exactly equal. (fun and easy fact: if the outputs are actually always equal,
then the `DoFn` is an associative and commutative operator)
So now you can see how a stateful `DoFn` differs from `CombineFn`, but I want to
step back and extrapolate this to a high level picture of how state in Beam
relates to using other features to achieve the same or similar goals: In a lot
of cases, what stateful processing represents is a chance to "get under the
hood" of the highly abstract mostly-deterministic functional paradigm of Beam
and do potentially-nondeterministic imperative-style programming that is hard
to express any other way.
## Example: arbitrary-but-consistent index assignment
Suppose that you want to give an index to every incoming element for a
key-and-window. You don't care what the indices are, just as long as they are
unique and consistent. Before diving into the code for how to do this in a Beam
SDK, I'll go over this example from the level of the model. In pictures, you
want to write a transform that maps input to output like this:
<img class="center-block"
src="/images/blog/stateful-processing/assign-indices.png"
alt="Assigning arbitrary but unique indices to each element"
width="180">
The order of the elements A, B, C, D, E is arbitrary, hence their assigned
indices are arbitrary, but downstream transforms just need to be OK with this.
There is no associativity or commutativity as far as the actual values are
concerned. The order-insensitivity of this transform only extends to the point
of ensuring the necessary properties of the output: no duplicated indices, no
gaps, and every element gets an index.
Conceptually expressing this as a stateful loop is as trivial as you can
imagine: The state you should store is the next index.
- As an element comes in, output it along with the next index.
- Increment the index.
This presents a good opportunity to talk about big data and parallelism,
because the algorithm in those bullet points is not parallelizable at all! If
you wanted to apply this logic over an entire `PCollection`, you would have to
process each element of the `PCollection` one-at-a-time... this is obviously a
bad idea. State in Beam is tightly scoped so that most of the time a stateful
`ParDo` transform should still be possible for a runner to execute in parallel,
though you still have to be thoughtful about it.
A state cell in Beam is scoped to a key+window pair. When your DoFn reads or
writes state by the name of `"index"`, it is actually accessing a mutable cell
specified by `"index"` _along with_ the key and window currently being
processed. So, when thinking about a state cell, it may be helpful to consider
the full state of your transform as a table, where the rows are named according
to names you use in your program, like `"index"`, and the columns are
key+window pairs, like this:
{{< table >}}
| | (key, window)<sub>1</sub> | (key, window)<sub>2</sub> | (key, window)<sub>3</sub> | ... |
|---------------|---------------------------|---------------------------|---------------------------|-----|
| `"index"` | `3` | `7` | `15` | ... |
| `"fizzOrBuzz?"` | `"fizz"` | `"7"` | `"fizzbuzz"` | ... |
| ... | ... | ... | ... | ... |
{{< /table >}}
(if you have a superb spatial sense, feel free to imagine this as a cube where
keys and windows are independent dimensions)
You can provide the opportunity for parallelism by making sure that table has
enough columns. You might have many keys and many windows, or you might have
many of just one or the other:
- Many keys in few windows, for example a globally windowed stateful computation
keyed by user ID.
- Many windows over few keys, for example a fixed windowed stateful computation
over a global key.
Caveat: all Beam runners today parallelize only over the key.
Most often your mental model of state can be focused on only a single column of
the table, a single key+window pair. Cross-column interactions do not occur
directly, by design.
## State in Beam's Java SDK
Now that I have talked a bit about stateful processing in the Beam model and
worked through an abstract example, I'd like to show you what it looks like to
write stateful processing code using Beam's Java SDK. Here is the code for a
stateful `DoFn` that assigns an arbitrary-but-consistent index to each element
on a per key-and-window basis:
{{< highlight java >}}
new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {
// A state cell holding a single Integer per key+window
@StateId("index")
private final StateSpec<ValueState<Integer>> indexSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(
ProcessContext context,
@StateId("index") ValueState<Integer> index) {
int current = firstNonNull(index.read(), 0);
context.output(KV.of(current, context.element()));
index.write(current+1);
}
}
{{< /highlight >}}
{{< highlight py >}}
class IndexAssigningStatefulDoFn(DoFn):
INDEX_STATE = CombiningStateSpec('index', sum)
def process(self, element, index=DoFn.StateParam(INDEX_STATE)):
unused_key, value = element
current_index = index.read()
yield (value, current_index)
index.add(1)
{{< /highlight >}}
Let's dissect this:
- The first thing to look at is the presence of a couple of `@StateId("index")`
annotations. This calls out that you are using a mutable state cell named
"index" in this `DoFn`. The Beam Java SDK, and from there your chosen runner,
will also note these annotations and use them to wire up your DoFn correctly.
- The first `@StateId("index")` is annotated on a field of type `StateSpec` (for
"state specification"). This declares and configures the state cell. The
type parameter `ValueState` describes the kind of state you can get out of this
cell - `ValueState` stores just a single value. Note that the spec itself is not
a usable state cell - you need the runner to provide that during pipeline
execution.
- To fully specify a `ValueState` cell, you need to provide the coder
that the runner will use (as necessary) to serialize the value
you will be storing. This is the invocation `StateSpecs.value(VarIntCoder.of())`.
- The second `@StateId("index")` annotation is on a parameter to your
`@ProcessElement` method. This indicates access to the ValueState cell that
was specified earlier.
- The state is accessed in the simplest way: `read()` to read it, and
`write(newvalue)` to write it.
- The other features of `DoFn` are available in the usual way - such as
`context.output(...)`. You can also use side inputs, side outputs, gain access
to the window, etc.
A few notes on how the SDK and runners see this DoFn:
- Your state cells are all explicitly declared so a Beam SDK or runner can
reason about them, for example to clear them out when a window expires.
- If you declare a state cell and then use it with the wrong type, the Beam
Java SDK will catch that error for you.
- If you declare two state cells with the same ID, the SDK will catch that,
too.
- The runner knows that this is a stateful `DoFn` and may run it quite
differently, for example by additional data shuffling and synchronization in
order to avoid concurrent access to state cells.
Let's look at one more example of how to use this API, this time a bit more real-world.
## Example: anomaly detection
Suppose you are feeding a stream of actions by your user into some complex
model to predict some quantitative expression of the sorts of actions they
take, for example to detect fraudulent activity. You will build up the model
from events, and also compare incoming events against the latest model to
determine if something has changed.
If you try to express the building of your model as a `CombineFn`, you may have
trouble with `mergeAccumulators`. Assuming you could express that, it might
look something like this:
{{< highlight java >}}
class ModelFromEventsFn extends CombineFn<Event, Model, Model> {
@Override
public abstract Model createAccumulator() {
return Model.empty();
}
@Override
public abstract Model addInput(Model accumulator, Event input) {
return accumulator.update(input); // this is encouraged to mutate, for efficiency
}
@Override
public abstract Model mergeAccumulators(Iterable<Model> accumulators) {
// ?? can you write this ??
}
@Override
public abstract Model extractOutput(Model accumulator) {
return accumulator; }
}
{{< /highlight >}}
{{< highlight py >}}
class ModelFromEventsFn(apache_beam.core.CombineFn):
def create_accumulator(self):
# Create a new empty model
return Model()
def add_input(self, model, input):
return model.update(input)
def merge_accumulators(self, accumulators):
# Custom merging logic
def extract_output(self, model):
return model
{{< /highlight >}}
Now you have a way to compute the model of a particular user for a window as
`Combine.perKey(new ModelFromEventsFn())`. How would you apply this model to
the same stream of events from which it is calculated? A standard way to do
take the result of a `Combine` transform and use it while processing the
elements of a `PCollection` is to read it as a side input to a `ParDo`
transform. So you could side input the model and check the stream of events
against it, outputting the prediction, like so:
{{< highlight java >}}
PCollection<KV<UserId, Event>> events = ...
final PCollectionView<Map<UserId, Model>> userModels = events
.apply(Combine.perKey(new ModelFromEventsFn()))
.apply(View.asMap());
PCollection<KV<UserId, Prediction>> predictions = events
.apply(ParDo.of(new DoFn<KV<UserId, Event>>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
UserId userId = ctx.element().getKey();
Event event = ctx.element().getValue();
Model model = ctx.sideinput(userModels).get(userId);
// Perhaps some logic around when to output a new prediction
… c.output(KV.of(userId, model.prediction(event))) …
}
}));
{{< /highlight >}}
{{< highlight py >}}
# Events is a collection of (user, event) pairs.
events = (p | ReadFromEventSource() | beam.WindowInto(....))
user_models = beam.pvalue.AsDict(
events
| beam.core.CombinePerKey(ModelFromEventsFn()))
def event_prediction(user_event, models):
user = user_event[0]
event = user_event[1]
# Retrieve the model calculated for this user
model = models[user]
return (user, model.prediction(event))
# Predictions is a collection of (user, prediction) pairs.
predictions = events | beam.Map(event_prediction, user_models)
{{< /highlight >}}
In this pipeline, there is just one model emitted by the `Combine.perKey(...)`
per user, per window, which is then prepared for side input by the `View.asMap()`
transform. The processing of the `ParDo` over events will block until that side
input is ready, buffering events, and will then check each event against the
model. This is a high latency, high completeness solution: The model takes into
account all user behavior in the window, but there can be no output until the
window is complete.
Suppose you want to get some results earlier, or don't even have any
natural windowing, but just want continuous analysis with the "model so far",
even though your model may not be as complete. How can you control the updates
to the model against which you are checking your events? Triggers are the
generic Beam feature for managing completeness versus latency tradeoffs. So here
is the same pipeline with an added trigger that outputs a new model one second
after input arrives:
{{< highlight java >}}
PCollection<KV<UserId, Event>> events = ...
PCollectionView<Map<UserId, Model>> userModels = events
// A tradeoff between latency and cost
.apply(Window.triggering(
AfterProcessingTime.pastFirstElementInPane(Duration.standardSeconds(1)))
.apply(Combine.perKey(new ModelFromEventsFn()))
.apply(View.asMap());
{{< /highlight >}}
{{< highlight py >}}
events = ...
user_models = beam.pvalue.AsDict(
events
| beam.WindowInto(GlobalWindows(),
trigger=trigger.AfterAll(
trigger.AfterCount(1),
trigger.AfterProcessingTime(1)))
| beam.CombinePerKey(ModelFromEventsFn()))
{{< /highlight >}}
This is often a pretty nice tradeoff between latency and cost: If a huge flood
of events comes in a second, then you will only emit one new model, so you
won't be flooded with model outputs that you cannot even use before they are
obsolete. In practice, the new model may not be present on the side input
channel until many more seconds have passed, due to caches and processing
delays preparing the side input. Many events (maybe an entire batch of
activity) will have passed through the `ParDo` and had their predictions
calculated according to the prior model. If the runner gave a tight enough
bound on cache expirations and you used a more aggressive trigger, you might be
able to improve latency at additional cost.
But there is another cost to consider: you are outputting many uninteresting
outputs from the `ParDo` that will be processed downstream. If the
"interestingness" of the output is only well-defined relative to the prior
output, then you cannot use a `Filter` transform to reduce data volume downstream.
Stateful processing lets you address both the latency problem of side inputs
and the cost problem of excessive uninteresting output. Here is the code, using
only features I have already introduced:
{{< highlight java >}}
new DoFn<KV<UserId, Event>, KV<UserId, Prediction>>() {
@StateId("model")
private final StateSpec<ValueState<Model>> modelSpec =
StateSpecs.value(Model.coder());
@StateId("previousPrediction")
private final StateSpec<ValueState<Prediction>> previousPredictionSpec =
StateSpecs.value(Prediction.coder());
@ProcessElement
public void processElement(
ProcessContext c,
@StateId("previousPrediction") ValueState<Prediction> previousPredictionState,
@StateId("model") ValueState<Model> modelState) {
UserId userId = c.element().getKey();
Event event = c.element().getValue()
Model model = modelState.read();
Prediction previousPrediction = previousPredictionState.read();
Prediction newPrediction = model.prediction(event);
model.add(event);
modelState.write(model);
if (previousPrediction == null
|| shouldOutputNewPrediction(previousPrediction, newPrediction)) {
c.output(KV.of(userId, newPrediction));
previousPredictionState.write(newPrediction);
}
}
};
{{< /highlight >}}
{{< highlight py >}}
class ModelStatefulFn(beam.DoFn):
PREVIOUS_PREDICTION = BagStateSpec('previous_pred_state', PredictionCoder())
MODEL_STATE = CombiningValueStateSpec('model_state',
ModelCoder(),
ModelFromEventsFn())
def process(self,
user_event,
previous_pred_state=beam.DoFn.StateParam(PREVIOUS_PREDICTION),
model_state=beam.DoFn.StateParam(MODEL_STATE)):
user = user_event[0]
event = user_event[1]
model = model_state.read()
previous_prediction = previous_pred_state.read()
new_prediction = model.prediction(event)
model_state.add(event)
if (previous_prediction is None
or self.should_output_prediction(
previous_prediction, new_prediction)):
previous_pred_state.clear()
previous_pred_state.add(new_prediction)
yield (user, new_prediction)
{{< /highlight >}}
Let's walk through it,
- You have two state cells declared, `@StateId("model")` to hold the current
state of the model for a user and `@StateId("previousPrediction")` to hold
the prediction output previously.
- Access to the two state cells by annotation in the `@ProcessElement` method
is as before.
- You read the current model via `modelState.read()`.
per-key-and-window, this is a model just for the UserId of the Event
currently being processed.
- You derive a new prediction `model.prediction(event)` and compare it against
the last one you output, accessed via
`previousPredicationState.read()`.
- You then update the model `model.update()` and write it via
`modelState.write(...)`. It is perfectly fine to mutate the value
you pulled out of state as long as you also remember to write the mutated
value, in the same way you are encouraged to mutate `CombineFn` accumulators.
- If the prediction has changed a significant amount since the last time you
output, you emit it via `context.output(...)` and
save the prediction using `previousPredictionState.write(...)`.
Here the decision is relative to the prior prediction output, not the last
one computed - realistically you might have some complex conditions here.
Most of the above is just talking through Java! But before you go out and
convert all of your pipelines to use stateful processing, I want to go over
some considerations as to whether it is a good fit for your use case.
## Performance considerations
To decide whether to use per-key-and-window state, you need to consider how it
executes. You can dig into how a particular runner manages state, but there are
some general things to keep in mind:
- Partitioning per-key-and-window: perhaps the most important thing to
consider is that the runner may have to shuffle your data to colocate all
the data for a particular key+window. If the data is already shuffled
correctly, the runner may take advantage of this.
- Synchronization overhead: the API is designed so the runner takes care of
concurrency control, but this means that the runner cannot parallelize
processing of elements for a particular key+window even when it would otherwise
be advantageous.
- Storage and fault tolerance of state: since state is per-key-and-window, the
more keys and windows you expect to process simultaneously, the more storage
you will incur. Because state benefits from all the fault tolerance /
consistency properties of your other data in Beam, it also adds to the cost of
committing the results of processing.
- Expiration of state: also since state is per-window, the runner can reclaim
the resources when a window expires (when the watermark exceeds its allowed
lateness) but this could mean that the runner is tracking an additional timer
per key and window to cause reclamation code to execute.
## Go use it!
If you are new to Beam, I hope you are now interested in seeing if Beam with
stateful processing addresses your use case. If you are already using Beam, I
hope this new addition to the model unlocks new use cases for you. Do check
the [capability
matrix](/documentation/runners/capability-matrix/) to
see the level of support for this new model feature on your favorite
backend(s).
And please do join the community at
[user@beam.apache.org](/get-started/support). We'd love to
hear from you.