blob: 5dd04d44f233ad2095d46e4c523b783c52d7124d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.state;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeutils.BytewiseComparator;
/**
* This interface contains methods for registering keyed state with a managed store.
*/
@PublicEvolving
public interface KeyedStateStore {
/**
* Gets a handle to the system's key/value state. The key/value state is only accessible
* if the function is executed on a KeyedStream. On each access, the state exposes the value
* for the key of the element currently processed by the function.
* Each function may have multiple partitioned states, addressed with different names.
*
* <p>Because the scope of each value is the key of the currently processed element,
* and the elements are distributed by the Flink runtime, the system can transparently
* scale out and redistribute the state and KeyedStream.
*
* <p>The following code example shows how to implement a continuous counter that counts
* how many times elements of a certain key occur, and emits an updated count for that
* element on each occurrence.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
*
* private ValueState<Long> count;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getState(
* new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* long count = state.value() + 1;
* state.update(value);
* return new Tuple2<>(value, count);
* }
* });
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);
/**
* Gets a handle to the system's key/value list state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* holds lists. One can adds elements to the list, or retrieve the list as a whole.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() {
*
* private ListState<MyType> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getListState(
* new ListStateDescriptor<>("myState", MyType.class));
* }
*
* public void flatMap(MyType value, Collector<MyType> out) {
* if (value.isDivider()) {
* for (MyType t : state.get()) {
* out.collect(t);
* }
* } else {
* state.add(value);
* }
* }
* });
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part os a KeyedStream).
*/
@PublicEvolving
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
/**
* Gets a handle to the system's key/value reducing state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private ReducingState<Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getReducingState(
* new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value.count());
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
/**
* Gets a handle to the system's key/value folding state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values with different types.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
* AggregateFunction<...> aggregateFunction = ...
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private AggregatingState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getAggregatingState(
* new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value);
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <IN> The type of the values that are added to the state.
* @param <ACC> The type of the accumulator (intermediate aggregation state).
* @param <OUT> The type of the values that are returned from the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
/**
* Gets a handle to the system's key/value folding state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values with different types.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private FoldingState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getReducingState(
* new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value);
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*
* @deprecated will be removed in a future version in favor of {@link AggregatingState}
*/
@PublicEvolving
@Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
/**
* Gets a handle to the system's key/value map state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* is composed of user-defined key-value pairs
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private MapState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getMapState(
* new MapStateDescriptor<>("sum", MyType.class, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* return new Tuple2<>(value, state.get(value));
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <UK> The type of the user keys stored in the state.
* @param <UV> The type of the user values stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
/**
* Gets a handle to the system's key/value sorted map state. This state is similar to the state
* accessed via {@link #getMapState(MapStateDescriptor)}, but the state is sorted under the given comparator.
*
* <p><b>IMPORTANT:</b> For RocksDBStateBackend, we only support {@link BytewiseComparator}.
* The serialized forms in {@link BytewiseComparator} are identical to that of the values
* only when the numbers to compare are both not negative.
*
* Serializers under {@link org.apache.flink.table.typeutils.ordered} maybe helpful if you want to use SortedMapState.
* </p>
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private SortedMapState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getSortedMapState(
* new SortedMapStateDescriptor<>("sum", new NaturalComparator<>(), MyType.class, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* return new Tuple2<>(value, state.get(value));
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <UK> The type of the user keys stored in the state.
* @param <UV> The type of the user values stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
default <UK, UV> SortedMapState<UK, UV> getSortedMapState(SortedMapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("This state is only accessible by functions executed on a KeyedStream");
}
}