blob: efaa7cf47dd240eccf1eebcf5870511c1c3532c7 [file] [log] [blame] [view]
---
title: "Queryable State"
nav-parent_id: streaming
nav-pos: 61
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
* ToC
{:toc}
<div class="alert alert-warning">
<strong>Note:</strong> The client APIs for queryable state are currently in an evolving state and
there are <strong>no guarantees</strong> made about stability of the provided interfaces. It is
likely that there will be breaking API changes on the client side in the upcoming Flink versions.
</div>
In a nutshell, this feature allows users to query Flink's managed partitioned state
(see [Working with State]({{ site.baseurl }}/dev/stream/state.html)) from outside of
Flink. For some scenarios, queryable state thus eliminates the need for distributed
operations/transactions with external systems such as key-value stores which are often the
bottleneck in practice.
<div class="alert alert-warning">
<strong>Attention:</strong> Queryable state accesses keyed state from a concurrent thread rather
than synchronizing with the operator and potentially blocking its operation. Since any state
backend using Java heap space, e.g. MemoryStateBackend or
FsStateBackend, does not work with copies when retrieving values but instead directly
references the stored values, read-modify-write patterns are unsafe and may cause the
queryable state server to fail due to concurrent modifications.
The RocksDBStateBackend is safe from these issues.
</div>
## Making State Queryable
In order to make state queryable, the queryable state server first needs to be enabled globally
by setting the `query.server.enable` configuration parameter to `true` (current default).
Then appropriate state needs to be made queryable by using either
* a `QueryableStateStream`, a convenience object which behaves like a sink and offers incoming values as
queryable state, or
* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the keyed state of an
operator queryable.
The following sections explain the use of these two approaches.
### Queryable State Stream
A `KeyedStream` may offer its values as queryable state by using the following methods:
{% highlight java %}
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
{% endhighlight %}
<div class="alert alert-info">
<strong>Note:</strong> There is no queryable <code>ListState</code> sink as it would result in an ever-growing
list which may not be cleaned up and thus will eventually consume too much memory.
</div>
A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and
currently only holds the name as well as the value and key serializer for the queryable state
stream. It is comparable to a sink, and cannot be followed by further transformations.
Internally a `QueryableStateStream` gets translated to an operator which uses all incoming
records to update the queryable state instance.
In a program like the following, all records of the keyed stream will be used to update the state
instance, either via `ValueState#update(value)` or `AppendingState#add(value)`, depending on
the chosen state variant:
{% highlight java %}
stream.keyBy(0).asQueryableState("query-name")
{% endhighlight %}
This acts like the Scala API's `flatMapWithState`.
### Managed Keyed State
Managed keyed state of an operator
(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state.html#using-managed-keyed-state))
can be made queryable by making the appropriate state descriptor queryable via
`StateDescriptor#setQueryable(String queryableStateName)`, as in the example below:
{% highlight java %}
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name"); // queryable state name
{% endhighlight %}
<div class="alert alert-info">
<strong>Note:</strong> The `queryableStateName` parameter may be chosen arbitrarily and is only
used for queries. It does not have to be identical to the state's own name.
</div>
## Querying State
The `QueryableStateClient` helper class may be used for queries against the `KvState` instances that
serve the state internally. It needs to be set up with a valid `JobManager` address and port and is
created as follows:
{% highlight java %}
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
{% endhighlight %}
The query method is this:
{% highlight java %}
Future<byte[]> getKvState(
JobID jobID,
String queryableStateName,
int keyHashCode,
byte[] serializedKeyAndNamespace)
{% endhighlight %}
A call to this method returns a `Future` eventually holding the serialized state value for the
queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The
`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` and the
`serializedKeyAndNamespace` is the serialized key and namespace.
<div class="alert alert-info">
<strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs
to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused in order to free
resources.
</div>
The current implementation is still pretty low-level in the sense that it only works with
serialized data both for providing the key/namespace and the returned results. It is the
responsibility of the user (or some follow-up utilities) to set up the serializers for this. The
nice thing about this is that the query services don't have to get into the business of worrying
about any class loading issues etc.
There are some serialization utils for key/namespace and value serialization included in
`KvStateRequestSerializer`.
### Example
The following example extends the `CountWindowAverage` example
(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state.html#using-managed-keyed-state))
by making it queryable and showing how to query this value:
{% highlight java %}
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
{% endhighlight %}
Once used in a job, you can retrieve the job ID and then query any key's current state from this operator:
{% highlight java %}
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, queryAddress);
config.setInteger(JobManagerOptions.PORT, queryPort);
final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
final TypeSerializer<Long> keySerializer =
TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
Future<byte[]> serializedResult =
client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
// now wait for the result and return it
final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
byte[] serializedValue = Await.result(serializedResult, duration);
Tuple2<Long, Long> value =
KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
{% endhighlight %}
### Note for Scala Users
Please use the available Scala extensions when creating the `TypeSerializer` instances. Add the following import:
```scala
import org.apache.flink.streaming.api.scala._
```
Now you can create the type serializers as follows:
```scala
val keySerializer = createTypeInformation[Long]
.createSerializer(new ExecutionConfig)
```
If you don't do this, you can run into mismatches between the serializers used in the Flink job and in your client code, because types like `scala.Long` cannot be captured at runtime.
## Configuration
The following configuration parameters influence the behaviour of the queryable state server and client.
They are defined in `QueryableStateOptions`.
### Server
* `query.server.enable`: flag to indicate whether to start the queryable state server
* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick random available port)
* `query.server.network-threads`: number of network (event loop) threads for the `KvStateServer` (0 => #slots)
* `query.server.query-threads`: number of asynchronous query threads for the `KvStateServerHandler` (0 => #slots).
### Client (`QueryableStateClient`)
* `query.client.network-threads`: number of network (event loop) threads for the `KvStateClient` (0 => number of available cores)
* `query.client.lookup.num-retries`: number of retries on location lookup failures
* `query.client.lookup.retry-delay`: retry delay on location lookup failures (millis)
## Limitations
* The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register
queryable state on startup and unregister it on disposal. In future versions, it is desirable to
decouple this in order to allow queries after a task finishes, and to speed up recovery via state
replication.
* Notifications about available KvState happen via a simple tell. In the future this should be improved to be
more robust with asks and acknowledgements.
* The server and client keep track of statistics for queries. These are currently disabled by
default as they would not be exposed anywhere. As soon as there is better support to publish these
numbers via the Metrics system, we should enable the stats.