Often times applications require some intial state provided by historical data in a file, database, or other system. Because state is managed by Apache Flink‘s snapshotting mechanism, for Stateful Function applications, that means writing the intial state into a savepoint that can be used to start the job. Users can bootstrap initial state for Stateful Functions applications using Flink’s State Processor API and a StatefulFunctionSavepointCreator
.
To get started, include the following libraries in your application:
{% highlight xml %} org.apache.flink statefun-flink-state-processor {{ site.version }} org.apache.flink flink-state-processor-api_{{ site.scala_version }} {{ site.flink_version }} {% endhighlight %}
A StateBootstrapFunction
defines how to bootstrap state for a StatefulFunction
instance with a given input.
Each bootstrap functions instance directly corresponds to a StatefulFunction
type. Likewise, each instance is uniquely identified by an address, represented by the type and id of the function being bootstrapped. Any state that is persisted by a bootstrap functions instance will be available to the corresponding live StatefulFunction
instance having the same address.
For example, consider the following state bootstrap function:
{% highlight java %} public class MyStateBootstrapFunction implements StateBootstrapFunction {
@Persisted private PersistedValue<MyState> state = PersistedValue.of("my-state", MyState.class); @Override public void bootstrap(Context context, Object input) { state.set(extractStateFromInput(input)); }
} {% endhighlight %}
Assume that this bootstrap function was provided for function type MyFunctionType
, and the id of the bootstrap function instance was id-13
. The function writes persisted state of name my-state
using the given bootstrap data. After restoring a Stateful Functions application from the savepoint generated using this bootstrap function, the stateful function instance with address (MyFunctionType, id-13)
will already have state values available under state name my-state
.
Savepoints are created by defining certain metadata, such as max parallelism and state backend. The default state backend is RocksDB.
{% highlight java %} int maxParallelism = 128; StatefulFunctionsSavepointCreator newSavepoint = new StatefulFunctionsSavepointCreator(maxParallelism); {% endhighlight %}
Each input data set is registered in the savepoint creator with a [router]({{ site.baseurl }}/io-module/index.html#router) that routes each record to zero or more function instances. You may then register any number of function types to the savepoint creator, similar to how functions are registered within a stateful functions module. Finally, specify an output location for the resulting savepoint.
{% highlight java %} // Read data from a file, database, or other location final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final DataSet<Tuple2<String, Integer>> userSeenCounts = env.fromElements( Tuple2.of(“foo”, 4), Tuple2.of(“bar”, 3), Tuple2.of(“joe”, 2));
// Register the dataset with a router newSavepoint.withBootstrapData(userSeenCounts, MyStateBootstrapFunctionRouter::new);
// Register a bootstrap function to process the records newSavepoint.withStateBootstrapFunctionProvider( new FunctionType(“apache”, “my-function”), ignored -> new MyStateBootstrapFunction());
newSavepoint.write(“file:///savepoint/path/”);
env.execute(); {% endhighlight %}
For full details of how to use Flink's DataSet
API, please check the official documentation.
After creating a new savpepoint, it can be used to provide the initial state for a Stateful Functions application.