blob: ce3c670521a1d74c1f1005e687323ba88de45e38 [file] [log] [blame] [view]
---
title: "Process Function (Low-level Operations)"
nav-title: "Process Function"
nav-parent_id: streaming
nav-pos: 35
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
* This will be replaced by the TOC
{:toc}
## The ProcessFunction
The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of
all (acyclic) streaming applications:
- events (stream elements)
- state (fault tolerant, consistent)
- timers (event time and processing time)
The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
be being invoked for each event received in the input stream(s).
For fault tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the
`RuntimeContext`, similar to the way other stateful functions can access keyed state. Like all functions with keyed state,
the `ProcessFunction` needs to be applied onto a `KeyedStream`:
```java
stream.keyBy("id").process(new MyProcessFunction())
```
The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to perform keyed state manipulation as well.
## Low-level Joins
To realize low-level operations on two inputs, applications can use `CoProcessFunction`. It relates to `ProcessFunction`
in the same way that `CoFlatMapFunction` relates to `FlatMapFunction`: the function is bound to two different inputs and
gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
Implementing a low level join typically follows this pattern:
- Create a state object for one input (or both)
- Update the state upon receiving elements from its input
- Upon receiving elements from the other input, probe the state and produce the joined result
For example, you might be joining customer data to financial trades,
while keeping state for the customer data. If you care about having
complete and deterministic joins in the face of out-of-order events,
you can use a timer to evaluate and emit the join for a trade when the
watermark for the customer data stream has passed the time of that
trade.
## Example
The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key:
- The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key.
- For each record, the `ProcessFunction` increments the counter and sets the last-modification timestamp
- The function also schedules a callback one minute into the future (in event time)
- Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
and emits the key/count if they match (i.e., no further update occurred during that minute)
*Note:* This simple example could have been implemented with session windows. We use `ProcessFunction` here to illustrate
the basic pattern it provides.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
/** The state that is maintained by this process function */
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified) {
// emit the state
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// the source data stream
DataStream<Tuple2<String, String>> stream = ...;
// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
.getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(key, 1, ctx.timestamp)
case CountWithTimestamp(key, count, time) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
out.collect((key, count))
case _ =>
}
}
}
{% endhighlight %}
</div>
</div>
{% top %}