title: “Working with State” nav-parent_id: streaming_state nav-pos: 1

This document explains how to use Flink's state abstractions when developing an application.

  • ToC {:toc}

Keyed State and Operator State

There are two basic kinds of state in Flink: Keyed State and Operator State.

Keyed State

Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.

You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. Each keyed-state is logically bound to a unique composite of <parallel-operator-instance, key>, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>.

Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.

Operator State

With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.

The Operator State interfaces support redistributing state among parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution.

Raw and Managed State

Keyed State and Operator State exist in two forms: managed and raw.

Managed State is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Examples are “ValueState”, “ListState”, etc. Flink's runtime encodes the states and writes them into the checkpoints.

Raw State is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes.

All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. Using managed state (rather than raw state) is recommended, since with managed state Flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management.

Attention If your managed state needs custom serialization logic, please see the corresponding guide in order to ensure future compatibility. Flink‘s default serializers don’t need special treatment.

Using Managed Keyed State

The managed keyed state interface provides access to different types of state that are all scoped to the key of the current input element. This means that this type of state can only be used on a KeyedStream, which can be created via stream.keyBy(…).

Now, we will first look at the different types of state available and then we will see how they can be used in a program. The available state primitives are:

  • ValueState<T>: This keeps a value that can be updated and retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value for each key that the operation sees). The value can be set using update(T) and retrieved using T value().

  • ListState<T>: This keeps a list of elements. You can append elements and retrieve an Iterable over all currently stored elements. Elements are added using add(T) or addAll(List<T>), the Iterable can be retrieved using Iterable<T> get(). You can also override the existing list with update(List<T>)

  • ReducingState<T>: This keeps a single value that represents the aggregation of all values added to the state. The interface is similar to ListState but elements added using add(T) are reduced to an aggregate using a specified ReduceFunction.

  • AggregatingState<IN, OUT>: This keeps a single value that represents the aggregation of all values added to the state. Contrary to ReducingState, the aggregate type may be different from the type of elements that are added to the state. The interface is the same as for ListState but elements added using add(IN) are aggregated using a specified AggregateFunction.

  • FoldingState<T, ACC>: This keeps a single value that represents the aggregation of all values added to the state. Contrary to ReducingState, the aggregate type may be different from the type of elements that are added to the state. The interface is similar to ListState but elements added using add(T) are folded into an aggregate using a specified FoldFunction.

  • MapState<UK, UV>: This keeps a list of mappings. You can put key-value pairs into the state and retrieve an Iterable over all currently stored mappings. Mappings are added using put(UK, UV) or putAll(Map<UK, UV>). The value associated with a user key can be retrieved using get(UK). The iterable views for mappings, keys and values can be retrieved using entries(), keys() and values() respectively.

  • SortedMapState<UK, UV>: This keeps a list of sorted mappings. You can put key-value pairs into the state and retrieve an Iterable over all currently stored mappings. Mappings are added using put(UK, UV) or putAll(Map<UK, UV). The value associated with a user key can be retrieved using get(UK). The iterable views for mappings, keys and values can be retrieved using entries(), keys(), and values() respectively. You can retrieve the first and last entry of the stored mappings, using firstEntry(), and lastEntry(), respectively. The head iterable view for mappings, tail iterable view and sub iterable view of the current sorted mappings can be retrieved by headIterator(), tailIterator() and subIterator(). Be careful when using SortedMapState. For RocksDBStateBackend, we only support BytewiseComparator. The comparison result of BytewiseComparator on heap objects is consistent with the comparison result after serializeation. Taking numbers as an example, the comparison result of the serialized forms is consistent with that of the numbers, only when the numbers to compare are both not negative. Serializers under org.apache.flink.table.typeutils.ordered would be helpful if you want to use SortedMapState.

All types of state also have a method clear() that clears the state for the currently active key, i.e. the key of the input element.

Attention FoldingState and FoldingStateDescriptor have been deprecated in Flink 1.4 and will be completely removed in the future. Please use AggregatingState and AggregatingStateDescriptor instead.

It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else. The second thing to keep in mind is that the value you get from the state depends on the key of the input element. So the value you get in one invocation of your user function can differ from the value in another invocation if the keys involved are different.

To get a state handle, you have to create a StateDescriptor. This holds the name of the state (as we will see later, you can create several states, and they have to have unique names so that you can reference them), the type of the values that the state holds, and possibly a user-specified function, such as a ReduceFunction. Depending on what type of state you want to retrieve, you create either a ValueStateDescriptor, a ListStateDescriptor, a ReducingStateDescriptor, a FoldingStateDescriptor, a AggregatingStateDescriptor, a MapStateDescriptor or a SortedMapStateDescriptor.

State is accessed using the RuntimeContext, so it is only possible in rich functions. Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for information about that, but we will also see an example shortly. The RuntimeContext that is available in a RichFunction has these methods for accessing state:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
  • SortedMapState<UK, UV> getSortedMapState(SortedMapStateDescriptor<UK, UV>)

This is an example FlatMapFunction that shows how all of the parts fit together:

/**
 * The ValueState handle. The first field is the count, the second field a running sum.
 */
private transient ValueState<Tuple2<Long, Long>> sum;

@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

    // access the state value
    Tuple2<Long, Long> currentSum = sum.value();

    // update the count
    currentSum.f0 += 1;

    // add the second field of the input value
    currentSum.f1 += input.f1;

    // update the state
    sum.update(currentSum);

    // if the count reaches 2, emit the average and clear the state
    if (currentSum.f0 >= 2) {
        out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
        sum.clear();
    }
}

@Override
public void open(Configuration config) {
    ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
            new ValueStateDescriptor<>(
                    "average", // the state name
                    TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                    Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
    sum = getRuntimeContext().getState(descriptor);
}

}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(0) .flatMap(new CountWindowAverage()) .print();

// the printed output will be (1,4) and (1,5) {% endhighlight %}

private var sum: ValueState[(Long, Long)] = _

override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

// access the state value
val tmpCurrentSum = sum.value

// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
  tmpCurrentSum
} else {
  (0L, 0L)
}

// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

// update the state
sum.update(newSum)

// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
  out.collect((input._1, newSum._2 / newSum._1))
  sum.clear()
}

}

override def open(parameters: Configuration): Unit = { sum = getRuntimeContext.getState( new ValueStateDescriptor[(Long, Long)](“average”, createTypeInformation[(Long, Long)]) ) } }

object ExampleCountWindowAverage extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment

env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() // the printed output will be (1,4) and (1,5)

env.execute(“ExampleManagedState”) } {% endhighlight %}

This example implements a poor man's counting window. We key the tuples by the first field (in the example all have the same key 1). The function stores the count and a running sum in a ValueState. Once the count reaches 2 it will emit the average and clear the state so that we start over from 0. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field.

The following example shows how to use SortedMapState's firstEntry to retrieve the first entry of current state.

@Override
public void flatMap(
    Tuple2<Long, Long> input,
    Collector<Tuple3<Long, Long, Long>> out) throws Exception {

	// access the state
	Long count = state.get(input.f1);
	// If it hasn't been used before, it will be null
	if (count == null) {
		count = 0L;
	}
	// update the state
	state.put(input.f1, count + 1);

    Map.Entry<Long, Long> firstEntry = state.firstEntry();
	out.collect(new Tuple3<>(input.f0, firstEntry.getKey(), firstEntry.getValue()));
}

@Override
public void open(Configuration config) {
	SortedMapStateDescriptor<Long, Long> descriptor =
		new SortedMapStateDescriptor<>(
			"sortedmapstate", // the state name
			BytewiseComparator.LONG_INSTANCE,
			Long.class,
			Long.class);
	state = getRuntimeContext().getSortedMapState(descriptor);
	}
}

public static void main(String[] args) throws Exception {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
	    Tuple2.of(1L, 4L), Tuple2.of(1L, 2L), Tuple2.of(1L, 2L), Tuple2.of(1L, 1L))
		.keyBy(0)
		.flatMap(new CountWindowFirstEntry())
		.print();
	// the printed output will be (1, 3, 1), (1, 3, 1), (1, 3, 2), (1, 3, 2), (1, 3, 2),
	//(1, 2, 1) (1, 2, 2) and (1, 1, 1)
	env.execute();
}

{% endhighlight %}

private var state: SortedMapState[java.lang.Long, java.lang.Long] = _

override def flatMap(input: (Long, Long), out: Collector[(Long, Long, Long)]): Unit = {

  // access the state value
  val tmpCurrentCount = state.get(input._2)

  // If it hasn't been used before, it will be null
  val count : Long = if (tmpCurrentCount != null) {
    tmpCurrentCount
  } else {
    0L
  }

  // update the state
  state.put(input._2, count + 1)

  val entry = state.firstEntry()
  out.collect((input._1, entry.getKey, entry.getValue))
}

override def open(parameters: Configuration): Unit = {
  state = getRuntimeContext.getSortedMapState(
   new SortedMapStateDescriptor[java.lang.Long, java.lang.Long]
   ("sortedmpastate", BytewiseComparator.LONG_INSTANCE, classOf[java.lang.Long], classOf[java.lang.Long])
  )
}

}

object CountWindowFirstEntry extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment

env.fromCollection(List[(Long, Long)](
  (1L, 3L),
  (1L, 5L),
  (1L, 3L),
  (1L, 7L),
  (1L, 4L),
  (1L, 2L),
  (1L, 2L),
  (1L, 1L)
)).keyBy(_._1)
  .flatMap(new CountWindowFirstEntry())
  .print()
// the printed output will be (1, 3, 1), (1, 3, 1), (1, 3, 2), (1, 3, 2), (1, 3, 2),
//(1, 2, 1) (1, 2, 2) and (1, 1, 1)

env.execute("ExampleManagedState")

} {% endhighlight %}

State in the Scala DataStream API

In addition to the interface described above, the Scala API has shortcuts for stateful map() or flatMap() functions with a single ValueState on KeyedStream. The user function gets the current value of the ValueState in an Option and must return an updated value that will be used to update the state.

{% highlight scala %} val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c + in._2) ) case None => ( (in._1, 0), Some(in._2) ) }) {% endhighlight %}

Using Managed Operator State

To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface.

CheckpointedFunction

The CheckpointedFunction interface provides access to non-keyed state with different redistribution schemes. It requires the implementation of two methods:

{% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %}

Whenever a checkpoint has to be performed, snapshotState() is called. The counterpart, initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint. Given this, initializeState() is not only the place where different types of state are initialized, but also where state recovery logic is included.

Currently, list-style managed operator state is supported. The state is expected to be a List of serializable objects, independent from each other, thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which non-keyed state can be redistributed. Depending on the state accessing method, the following redistribution schemes are defined:

  • Even-split redistribution: Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. Each operator gets a sublist, which can be empty, or contain one or more elements. As an example, if with parallelism 1 the checkpointed state of an operator contains elements element1 and element2, when increasing the parallelism to 2, element1 may end up in operator instance 0, while element2 will go to operator instance 1.

  • Union redistribution: Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, each operator gets the complete list of state elements.

Below is an example of a stateful SinkFunction that uses CheckpointedFunction to buffer elements before sending them to the outside world. It demonstrates the basic even-split redistribution list state:

private final int threshold;

private transient ListState<Tuple2<String, Integer>> checkpointedState;

private List<Tuple2<String, Integer>> bufferedElements;

public BufferingSink(int threshold) {
    this.threshold = threshold;
    this.bufferedElements = new ArrayList<>();
}

@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
    bufferedElements.add(value);
    if (bufferedElements.size() == threshold) {
        for (Tuple2<String, Integer> element: bufferedElements) {
            // send it to the sink
        }
        bufferedElements.clear();
    }
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    checkpointedState.clear();
    for (Tuple2<String, Integer> element : bufferedElements) {
        checkpointedState.add(element);
    }
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    ListStateDescriptor<Tuple2<String, Integer>> descriptor =
        new ListStateDescriptor<>(
            "buffered-elements",
            TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    if (context.isRestored()) {
        for (Tuple2<String, Integer> element : checkpointedState.get()) {
            bufferedElements.add(element);
        }
    }
}

} {% endhighlight %}

@transient private var checkpointedState: ListState[(String, Int)] = _

private val bufferedElements = ListBuffer(String, Int)

override def invoke(value: (String, Int)): Unit = { bufferedElements += value if (bufferedElements.size == threshold) { for (element <- bufferedElements) { // send it to the sink } bufferedElements.clear() } }

override def snapshotState(context: FunctionSnapshotContext): Unit = { checkpointedState.clear() for (element <- bufferedElements) { checkpointedState.add(element) } }

override def initializeState(context: FunctionInitializationContext): Unit = { val descriptor = new ListStateDescriptor[(String, Int)]( “buffered-elements”, TypeInformation.of(new TypeHint(String, Int) {}) )

checkpointedState = context.getOperatorStateStore.getListState(descriptor)

if(context.isRestored) {
  for(element <- checkpointedState.get()) {
    bufferedElements += element
  }
}

}

} {% endhighlight %}

The initializeState method takes as argument a FunctionInitializationContext. This is used to initialize the non-keyed state “containers”. These are a container of type ListState where the non-keyed state objects are going to be stored upon checkpointing.

Note how the state is initialized, similar to keyed state, with a StateDescriptor that contains the state name and information about the type of the value that the state holds:

checkpointedState = context.getOperatorStateStore().getListState(descriptor); {% endhighlight %}

val descriptor = new ListStateDescriptor[(String, Long)]( “buffered-elements”, TypeInformation.of(new TypeHint(String, Long) {}) )

checkpointedState = context.getOperatorStateStore.getListState(descriptor)

{% endhighlight %}

After initializing the container, we use the isRestored() method of the context to check if we are recovering after a failure. If this is true, i.e. we are recovering, the restore logic is applied.

As shown in the code of the modified BufferingSink, this ListState recovered during state initialization is kept in a class variable for future use in snapshotState(). There the ListState is cleared of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.

As a side note, the keyed state can also be initialized in the initializeState() method. This can be done using the provided FunctionInitializationContext.

ListCheckpointed

The ListCheckpointed interface is a more limited variant of CheckpointedFunction, which only supports list-style state with even-split redistribution scheme on restore. It also requires the implementation of two methods:

{% highlight java %} List snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List state) throws Exception; {% endhighlight %}

On snapshotState() the operator should return a list of objects to checkpoint and restoreState has to handle such a list upon recovery. If the state is not re-partitionable, you can always return a Collections.singletonList(MY_STATE) in the snapshotState().

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source's context.

/**  current offset for exactly once semantics */
private Long offset;

/** flag for job cancellation */
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> ctx) {
    final Object lock = ctx.getCheckpointLock();

    while (isRunning) {
        // output and state update are atomic
        synchronized (lock) {
            ctx.collect(offset);
            offset += 1;
        }
    }
}

@Override
public void cancel() {
    isRunning = false;
}

@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
    return Collections.singletonList(offset);
}

@Override
public void restoreState(List<Long> state) {
    for (Long s : state)
        offset = s;
}

} {% endhighlight %}

@volatile private var isRunning = true

private var offset = 0L

override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { val lock = ctx.getCheckpointLock

while (isRunning) {
  // output and state update are atomic
  lock.synchronized({
    ctx.collect(offset)

    offset += 1
  })
}

}

override def cancel(): Unit = isRunning = false

override def restoreState(state: util.List[Long]): Unit = for (s <- state) { offset = s }

override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = Collections.singletonList(offset)

} {% endhighlight %}

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the org.apache.flink.runtime.state.CheckpointListener interface.

{% top %}