| --- |
| title: "DataStream API" |
| nav-id: datastreamwalkthrough |
| nav-title: 'DataStream API' |
| nav-parent_id: walkthroughs |
| nav-pos: 1 |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| Apache Flink offers a DataStream API for building robust, stateful streaming applications. |
| It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems. |
| In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API. |
| |
| * This will be replaced by the TOC |
| {:toc} |
| |
| ## What Are You Building? |
| |
| Credit card fraud is a growing concern in the digital age. |
| Criminals steal credit card numbers by running scams or hacking into insecure systems. |
| Stolen numbers are tested by making one or more small purchases, often for a dollar or less. |
| If that works, they then make more significant purchases to get items they can sell or keep for themselves. |
| |
| In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions. |
| Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time. |
| |
| ## Prerequisites |
| |
| This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language. |
| |
| ## Help, I’m Stuck! |
| |
| If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html). |
| In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. |
| |
| ## How to Follow Along |
| |
| If you want to follow along, you will require a computer with: |
| |
| * Java 8 or 11 |
| * Maven |
| |
| A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic. |
| These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough. |
| |
| {% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %} |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight bash %} |
| $ mvn archetype:generate \ |
| -DarchetypeGroupId=org.apache.flink \ |
| -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %} |
| -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} |
| -DarchetypeVersion={{ site.version }} \ |
| -DgroupId=frauddetection \ |
| -DartifactId=frauddetection \ |
| -Dversion=0.1 \ |
| -Dpackage=spendreport \ |
| -DinteractiveMode=false |
| {% endhighlight %} |
| </div> |
| <div data-lang="scala" markdown="1"> |
| {% highlight bash %} |
| $ mvn archetype:generate \ |
| -DarchetypeGroupId=org.apache.flink \ |
| -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %} |
| -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} |
| -DarchetypeVersion={{ site.version }} \ |
| -DgroupId=frauddetection \ |
| -DartifactId=frauddetection \ |
| -Dversion=0.1 \ |
| -Dpackage=spendreport \ |
| -DinteractiveMode=false |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% unless site.is_stable %} |
| <p style="border-radius: 5px; padding: 5px" class="bg-danger"> |
| <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a> |
| </p> |
| {% endunless %} |
| |
| You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, |
| Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial. |
| After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE. |
| Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| #### FraudDetectionJob.java |
| |
| {% highlight java %} |
| package spendreport; |
| |
| import org.apache.flink.streaming.api.datastream.DataStream; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.walkthrough.common.sink.AlertSink; |
| import org.apache.flink.walkthrough.common.entity.Alert; |
| import org.apache.flink.walkthrough.common.entity.Transaction; |
| import org.apache.flink.walkthrough.common.source.TransactionSource; |
| |
| public class FraudDetectionJob { |
| |
| public static void main(String[] args) throws Exception { |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| DataStream<Transaction> transactions = env |
| .addSource(new TransactionSource()) |
| .name("transactions"); |
| |
| DataStream<Alert> alerts = transactions |
| .keyBy(Transaction::getAccountId) |
| .process(new FraudDetector()) |
| .name("fraud-detector"); |
| |
| alerts |
| .addSink(new AlertSink()) |
| .name("send-alerts"); |
| |
| env.execute("Fraud Detection"); |
| } |
| } |
| {% endhighlight %} |
| |
| #### FraudDetector.java |
| {% highlight java %} |
| package spendreport; |
| |
| import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
| import org.apache.flink.util.Collector; |
| import org.apache.flink.walkthrough.common.entity.Alert; |
| import org.apache.flink.walkthrough.common.entity.Transaction; |
| |
| public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private static final double SMALL_AMOUNT = 1.00; |
| private static final double LARGE_AMOUNT = 500.00; |
| private static final long ONE_MINUTE = 60 * 1000; |
| |
| @Override |
| public void processElement( |
| Transaction transaction, |
| Context context, |
| Collector<Alert> collector) throws Exception { |
| |
| Alert alert = new Alert(); |
| alert.setId(transaction.getAccountId()); |
| |
| collector.collect(alert); |
| } |
| } |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| #### FraudDetectionJob.scala |
| |
| {% highlight scala %} |
| package spendreport |
| |
| import org.apache.flink.streaming.api.scala._ |
| import org.apache.flink.walkthrough.common.sink.AlertSink |
| import org.apache.flink.walkthrough.common.entity.Alert |
| import org.apache.flink.walkthrough.common.entity.Transaction |
| import org.apache.flink.walkthrough.common.source.TransactionSource |
| |
| object FraudDetectionJob { |
| |
| @throws[Exception] |
| def main(args: Array[String]): Unit = { |
| val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment |
| |
| val transactions: DataStream[Transaction] = env |
| .addSource(new TransactionSource) |
| .name("transactions") |
| |
| val alerts: DataStream[Alert] = transactions |
| .keyBy(transaction => transaction.getAccountId) |
| .process(new FraudDetector) |
| .name("fraud-detector") |
| |
| alerts |
| .addSink(new AlertSink) |
| .name("send-alerts") |
| |
| env.execute("Fraud Detection") |
| } |
| } |
| {% endhighlight %} |
| |
| #### FraudDetector.scala |
| |
| {% highlight scala %} |
| package spendreport |
| |
| import org.apache.flink.streaming.api.functions.KeyedProcessFunction |
| import org.apache.flink.util.Collector |
| import org.apache.flink.walkthrough.common.entity.Alert |
| import org.apache.flink.walkthrough.common.entity.Transaction |
| |
| object FraudDetector { |
| val SMALL_AMOUNT: Double = 1.00 |
| val LARGE_AMOUNT: Double = 500.00 |
| val ONE_MINUTE: Long = 60 * 1000L |
| } |
| |
| @SerialVersionUID(1L) |
| class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { |
| |
| @throws[Exception] |
| def processElement( |
| transaction: Transaction, |
| context: KeyedProcessFunction[Long, Transaction, Alert]#Context, |
| collector: Collector[Alert]): Unit = { |
| |
| val alert = new Alert |
| alert.setId(transaction.getAccountId) |
| |
| collector.collect(alert) |
| } |
| } |
| |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| ## Breaking Down the Code |
| |
| Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions. |
| |
| We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class. |
| |
| #### The Execution Environment |
| |
| The first line sets up your `StreamExecutionEnvironment`. |
| The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### Creating a Source |
| |
| Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs. |
| This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process. |
| Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`). |
| The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| DataStream<Transaction> transactions = env |
| .addSource(new TransactionSource()) |
| .name("transactions"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| val transactions: DataStream[Transaction] = env |
| .addSource(new TransactionSource) |
| .name("transactions") |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| |
| #### Partitioning Events & Detecting Fraud |
| |
| The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator. |
| |
| To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`. |
| The `process()` call adds an operator that applies a function to each partitioned element in the stream. |
| It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_. |
| |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| DataStream<Alert> alerts = transactions |
| .keyBy(Transaction::getAccountId) |
| .process(new FraudDetector()) |
| .name("fraud-detector"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| val alerts: DataStream[Alert] = transactions |
| .keyBy(transaction => transaction.getAccountId) |
| .process(new FraudDetector) |
| .name("fraud-detector") |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### Outputting Results |
| |
| A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis. |
| The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| alerts.addSink(new AlertSink()); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| alerts.addSink(new AlertSink) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### Executing the Job |
| |
| Flink applications are built lazily and shipped to the cluster for execution only once fully formed. |
| Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| env.execute("Fraud Detection"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| env.execute("Fraud Detection") |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### The Fraud Detector |
| |
| The fraud detector is implemented as a `KeyedProcessFunction`. |
| Its method `KeyedProcessFunction#processElement` is called for every transaction event. |
| This first version produces an alert on every transaction, which some may say is overly conservative. |
| |
| The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { |
| |
| private static final double SMALL_AMOUNT = 1.00; |
| private static final double LARGE_AMOUNT = 500.00; |
| private static final long ONE_MINUTE = 60 * 1000; |
| |
| @Override |
| public void processElement( |
| Transaction transaction, |
| Context context, |
| Collector<Alert> collector) throws Exception { |
| |
| Alert alert = new Alert(); |
| alert.setId(transaction.getAccountId()); |
| |
| collector.collect(alert); |
| } |
| } |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| object FraudDetector { |
| val SMALL_AMOUNT: Double = 1.00 |
| val LARGE_AMOUNT: Double = 500.00 |
| val ONE_MINUTE: Long = 60 * 1000L |
| } |
| |
| @SerialVersionUID(1L) |
| class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { |
| |
| @throws[Exception] |
| def processElement( |
| transaction: Transaction, |
| context: KeyedProcessFunction[Long, Transaction, Alert]#Context, |
| collector: Collector[Alert]): Unit = { |
| |
| val alert = new Alert |
| alert.setId(transaction.getAccountId) |
| |
| collector.collect(alert) |
| } |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| ## Writing a Real Application (v1) |
| |
| For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500. |
| Imagine your fraud detector processes the following stream of transactions for a particular account. |
| |
| <p class="text-center"> |
| <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/> |
| </p> |
| |
| Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510. |
| Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern. |
| |
| To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small. |
| Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html). |
| It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough. |
| |
| The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed. |
| When a large transaction comes through, you can simply check if the flag is set for that account. |
| |
| However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. |
| Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert. |
| We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure. |
| Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure. |
| |
| To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables. |
| |
| The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps. |
| `ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`. |
| A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed. |
| In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. |
| `ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data. |
| The right hook for this is the `open()` method. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private transient ValueState<Boolean> flagState; |
| |
| @Override |
| public void open(Configuration parameters) { |
| ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( |
| "flag", |
| Types.BOOLEAN); |
| flagState = getRuntimeContext().getState(flagDescriptor); |
| } |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| @SerialVersionUID(1L) |
| class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { |
| |
| @transient private var flagState: ValueState[java.lang.Boolean] = _ |
| |
| @throws[Exception] |
| override def open(parameters: Configuration): Unit = { |
| val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) |
| flagState = getRuntimeContext.getState(flagDescriptor) |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| `ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library. |
| It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents. |
| If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`. |
| Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`. |
| Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable. |
| |
| Below, you can see an example of how you can use a flag state to track potential fraudulent transactions. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| @Override |
| public void processElement( |
| Transaction transaction, |
| Context context, |
| Collector<Alert> collector) throws Exception { |
| |
| // Get the current state for the current key |
| Boolean lastTransactionWasSmall = flagState.value(); |
| |
| // Check if the flag is set |
| if (lastTransactionWasSmall != null) { |
| if (transaction.getAmount() > LARGE_AMOUNT) { |
| // Output an alert downstream |
| Alert alert = new Alert(); |
| alert.setId(transaction.getAccountId()); |
| |
| collector.collect(alert); |
| } |
| |
| // Clean up our state |
| flagState.clear(); |
| } |
| |
| if (transaction.getAmount() < SMALL_AMOUNT) { |
| // Set the flag to true |
| flagState.update(true); |
| } |
| } |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| override def processElement( |
| transaction: Transaction, |
| context: KeyedProcessFunction[Long, Transaction, Alert]#Context, |
| collector: Collector[Alert]): Unit = { |
| |
| // Get the current state for the current key |
| val lastTransactionWasSmall = flagState.value |
| |
| // Check if the flag is set |
| if (lastTransactionWasSmall != null) { |
| if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) { |
| // Output an alert downstream |
| val alert = new Alert |
| alert.setId(transaction.getAccountId) |
| |
| collector.collect(alert) |
| } |
| // Clean up our state |
| flagState.clear() |
| } |
| |
| if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { |
| // set the flag to true |
| flagState.update(true) |
| } |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| For every transaction, the fraud detector checks the state of the flag for that account. |
| Remember, `ValueState` is always scoped to the current key, i.e., account. |
| If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert. |
| |
| After that check, the flag state is unconditionally cleared. |
| Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted. |
| |
| Finally, the transaction amount is checked to see if it is small. |
| If so, then the flag is set so that it can be checked by the next event. |
| Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable. |
| This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not. |
| |
| ## Fraud Detector v2: State + Time = ❤️ |
| |
| Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed. |
| For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other. |
| Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future. |
| |
| Let's see how we can modify our Job to comply with our new requirements: |
| |
| * Whenever the flag is set to `true`, also set a timer for 1 minute in the future. |
| * When the timer fires, reset the flag by clearing its state. |
| * If the flag is ever cleared the timer should be canceled. |
| |
| To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| private transient ValueState<Boolean> flagState; |
| private transient ValueState<Long> timerState; |
| |
| @Override |
| public void open(Configuration parameters) { |
| ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( |
| "flag", |
| Types.BOOLEAN); |
| flagState = getRuntimeContext().getState(flagDescriptor); |
| |
| ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( |
| "timer-state", |
| Types.LONG); |
| timerState = getRuntimeContext().getState(timerDescriptor); |
| } |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| @SerialVersionUID(1L) |
| class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { |
| |
| @transient private var flagState: ValueState[java.lang.Boolean] = _ |
| @transient private var timerState: ValueState[java.lang.Long] = _ |
| |
| @throws[Exception] |
| override def open(parameters: Configuration): Unit = { |
| val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) |
| flagState = getRuntimeContext.getState(flagDescriptor) |
| |
| val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG) |
| timerState = getRuntimeContext.getState(timerDescriptor) |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| `KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service. |
| The timer service can be used to query the current time, register timers, and delete timers. |
| With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| if (transaction.getAmount() < SMALL_AMOUNT) { |
| // set the flag to true |
| flagState.update(true); |
| |
| // set the timer and timer state |
| long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; |
| context.timerService().registerProcessingTimeTimer(timer); |
| timerState.update(timer); |
| } |
| {% endhighlight %} |
| </div> |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { |
| // set the flag to true |
| flagState.update(true) |
| |
| // set the timer and timer state |
| val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE |
| context.timerService.registerProcessingTimeTimer(timer) |
| timerState.update(timer) |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| Processing time is wall clock time, and is determined by the system clock of the machine running the operator. |
| |
| When a timer fires, it calls `KeyedProcessFunction#onTimer`. |
| Overriding this method is how you can implement your callback to reset the flag. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| @Override |
| public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { |
| // remove flag after 1 minute |
| timerState.clear(); |
| flagState.clear(); |
| } |
| {% endhighlight %} |
| </div> |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| override def onTimer( |
| timestamp: Long, |
| ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, |
| out: Collector[Alert]): Unit = { |
| // remove flag after 1 minute |
| timerState.clear() |
| flagState.clear() |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| Finally, to cancel the timer, you need to delete the registered timer and delete the timer state. |
| You can wrap this in a helper method and call this method instead of `flagState.clear()`. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| private void cleanUp(Context ctx) throws Exception { |
| // delete timer |
| Long timer = timerState.value(); |
| ctx.timerService().deleteProcessingTimeTimer(timer); |
| |
| // clean up all state |
| timerState.clear(); |
| flagState.clear(); |
| } |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| @throws[Exception] |
| private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = { |
| // delete timer |
| val timer = timerState.value |
| ctx.timerService.deleteProcessingTimeTimer(timer) |
| |
| // clean up all states |
| timerState.clear() |
| flagState.clear() |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| And that's it, a fully functional, stateful, distributed streaming application! |
| |
| ## Final Application |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| package spendreport; |
| |
| import org.apache.flink.api.common.state.ValueState; |
| import org.apache.flink.api.common.state.ValueStateDescriptor; |
| import org.apache.flink.api.common.typeinfo.Types; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
| import org.apache.flink.util.Collector; |
| import org.apache.flink.walkthrough.common.entity.Alert; |
| import org.apache.flink.walkthrough.common.entity.Transaction; |
| |
| public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private static final double SMALL_AMOUNT = 1.00; |
| private static final double LARGE_AMOUNT = 500.00; |
| private static final long ONE_MINUTE = 60 * 1000; |
| |
| private transient ValueState<Boolean> flagState; |
| private transient ValueState<Long> timerState; |
| |
| @Override |
| public void open(Configuration parameters) { |
| ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( |
| "flag", |
| Types.BOOLEAN); |
| flagState = getRuntimeContext().getState(flagDescriptor); |
| |
| ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( |
| "timer-state", |
| Types.LONG); |
| timerState = getRuntimeContext().getState(timerDescriptor); |
| } |
| |
| @Override |
| public void processElement( |
| Transaction transaction, |
| Context context, |
| Collector<Alert> collector) throws Exception { |
| |
| // Get the current state for the current key |
| Boolean lastTransactionWasSmall = flagState.value(); |
| |
| // Check if the flag is set |
| if (lastTransactionWasSmall != null) { |
| if (transaction.getAmount() > LARGE_AMOUNT) { |
| //Output an alert downstream |
| Alert alert = new Alert(); |
| alert.setId(transaction.getAccountId()); |
| |
| collector.collect(alert); |
| } |
| // Clean up our state |
| cleanUp(context); |
| } |
| |
| if (transaction.getAmount() < SMALL_AMOUNT) { |
| // set the flag to true |
| flagState.update(true); |
| |
| long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; |
| context.timerService().registerProcessingTimeTimer(timer); |
| |
| timerState.update(timer); |
| } |
| } |
| |
| @Override |
| public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { |
| // remove flag after 1 minute |
| timerState.clear(); |
| flagState.clear(); |
| } |
| |
| private void cleanUp(Context ctx) throws Exception { |
| // delete timer |
| Long timer = timerState.value(); |
| ctx.timerService().deleteProcessingTimeTimer(timer); |
| |
| // clean up all state |
| timerState.clear(); |
| flagState.clear(); |
| } |
| } |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| package spendreport |
| |
| import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} |
| import org.apache.flink.api.scala.typeutils.Types |
| import org.apache.flink.configuration.Configuration |
| import org.apache.flink.streaming.api.functions.KeyedProcessFunction |
| import org.apache.flink.util.Collector |
| import org.apache.flink.walkthrough.common.entity.Alert |
| import org.apache.flink.walkthrough.common.entity.Transaction |
| |
| object FraudDetector { |
| val SMALL_AMOUNT: Double = 1.00 |
| val LARGE_AMOUNT: Double = 500.00 |
| val ONE_MINUTE: Long = 60 * 1000L |
| } |
| |
| @SerialVersionUID(1L) |
| class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { |
| |
| @transient private var flagState: ValueState[java.lang.Boolean] = _ |
| @transient private var timerState: ValueState[java.lang.Long] = _ |
| |
| @throws[Exception] |
| override def open(parameters: Configuration): Unit = { |
| val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) |
| flagState = getRuntimeContext.getState(flagDescriptor) |
| |
| val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG) |
| timerState = getRuntimeContext.getState(timerDescriptor) |
| } |
| |
| override def processElement( |
| transaction: Transaction, |
| context: KeyedProcessFunction[Long, Transaction, Alert]#Context, |
| collector: Collector[Alert]): Unit = { |
| |
| // Get the current state for the current key |
| val lastTransactionWasSmall = flagState.value |
| |
| // Check if the flag is set |
| if (lastTransactionWasSmall != null) { |
| if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) { |
| // Output an alert downstream |
| val alert = new Alert |
| alert.setId(transaction.getAccountId) |
| |
| collector.collect(alert) |
| } |
| // Clean up our state |
| cleanUp(context) |
| } |
| |
| if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { |
| // set the flag to true |
| flagState.update(true) |
| val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE |
| |
| context.timerService.registerProcessingTimeTimer(timer) |
| timerState.update(timer) |
| } |
| } |
| |
| override def onTimer( |
| timestamp: Long, |
| ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, |
| out: Collector[Alert]): Unit = { |
| // remove flag after 1 minute |
| timerState.clear() |
| flagState.clear() |
| } |
| |
| @throws[Exception] |
| private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = { |
| // delete timer |
| val timer = timerState.value |
| ctx.timerService.deleteProcessingTimeTimer(timer) |
| |
| // clean up all states |
| timerState.clear() |
| flagState.clear() |
| } |
| } |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| ### Expected Output |
| |
| Running this code with the provided `TransactionSource` will emit fraud alerts for account 3. |
| You should see the following output in your task manager logs: |
| |
| {% highlight bash %} |
| 2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} |
| 2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} |
| 2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} |
| 2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} |
| 2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} |
| {% endhighlight %} |