| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.api.common.state; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.api.common.typeutils.BytewiseComparator; |
| |
| /** |
| * This interface contains methods for registering keyed state with a managed store. |
| */ |
| @PublicEvolving |
| public interface KeyedStateStore { |
| |
| /** |
| * Gets a handle to the system's key/value state. The key/value state is only accessible |
| * if the function is executed on a KeyedStream. On each access, the state exposes the value |
| * for the key of the element currently processed by the function. |
| * Each function may have multiple partitioned states, addressed with different names. |
| * |
| * <p>Because the scope of each value is the key of the currently processed element, |
| * and the elements are distributed by the Flink runtime, the system can transparently |
| * scale out and redistribute the state and KeyedStream. |
| * |
| * <p>The following code example shows how to implement a continuous counter that counts |
| * how many times elements of a certain key occur, and emits an updated count for that |
| * element on each occurrence. |
| * |
| * <pre>{@code |
| * DataStream<MyType> stream = ...; |
| * KeyedStream<MyType> keyedStream = stream.keyBy("id"); |
| * |
| * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() { |
| * |
| * private ValueState<Long> count; |
| * |
| * public void open(Configuration cfg) { |
| * state = getRuntimeContext().getState( |
| * new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L)); |
| * } |
| * |
| * public Tuple2<MyType, Long> map(MyType value) { |
| * long count = state.value() + 1; |
| * state.update(value); |
| * return new Tuple2<>(value, count); |
| * } |
| * }); |
| * }</pre> |
| * |
| * @param stateProperties The descriptor defining the properties of the stats. |
| * |
| * @param <T> The type of value stored in the state. |
| * |
| * @return The partitioned state object. |
| * |
| * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the |
| * function (function is not part of a KeyedStream). |
| */ |
| @PublicEvolving |
| <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties); |
| |
| /** |
| * Gets a handle to the system's key/value list state. This state is similar to the state |
| * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that |
| * holds lists. One can adds elements to the list, or retrieve the list as a whole. |
| * |
| * <p>This state is only accessible if the function is executed on a KeyedStream. |
| * |
| * <pre>{@code |
| * DataStream<MyType> stream = ...; |
| * KeyedStream<MyType> keyedStream = stream.keyBy("id"); |
| * |
| * keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() { |
| * |
| * private ListState<MyType> state; |
| * |
| * public void open(Configuration cfg) { |
| * state = getRuntimeContext().getListState( |
| * new ListStateDescriptor<>("myState", MyType.class)); |
| * } |
| * |
| * public void flatMap(MyType value, Collector<MyType> out) { |
| * if (value.isDivider()) { |
| * for (MyType t : state.get()) { |
| * out.collect(t); |
| * } |
| * } else { |
| * state.add(value); |
| * } |
| * } |
| * }); |
| * }</pre> |
| * |
| * @param stateProperties The descriptor defining the properties of the stats. |
| * |
| * @param <T> The type of value stored in the state. |
| * |
| * @return The partitioned state object. |
| * |
| * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the |
| * function (function is not part os a KeyedStream). |
| */ |
| @PublicEvolving |
| <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties); |
| |
| /** |
| * Gets a handle to the system's key/value reducing state. This state is similar to the state |
| * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that |
| * aggregates values. |
| * |
| * <p>This state is only accessible if the function is executed on a KeyedStream. |
| * |
| * <pre>{@code |
| * DataStream<MyType> stream = ...; |
| * KeyedStream<MyType> keyedStream = stream.keyBy("id"); |
| * |
| * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { |
| * |
| * private ReducingState<Long> state; |
| * |
| * public void open(Configuration cfg) { |
| * state = getRuntimeContext().getReducingState( |
| * new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class)); |
| * } |
| * |
| * public Tuple2<MyType, Long> map(MyType value) { |
| * state.add(value.count()); |
| * return new Tuple2<>(value, state.get()); |
| * } |
| * }); |
| * |
| * }</pre> |
| * |
| * @param stateProperties The descriptor defining the properties of the stats. |
| * |
| * @param <T> The type of value stored in the state. |
| * |
| * @return The partitioned state object. |
| * |
| * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the |
| * function (function is not part of a KeyedStream). |
| */ |
| @PublicEvolving |
| <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties); |
| |
| /** |
| * Gets a handle to the system's key/value folding state. This state is similar to the state |
| * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that |
| * aggregates values with different types. |
| * |
| * <p>This state is only accessible if the function is executed on a KeyedStream. |
| * |
| * <pre>{@code |
| * DataStream<MyType> stream = ...; |
| * KeyedStream<MyType> keyedStream = stream.keyBy("id"); |
| * AggregateFunction<...> aggregateFunction = ... |
| * |
| * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { |
| * |
| * private AggregatingState<MyType, Long> state; |
| * |
| * public void open(Configuration cfg) { |
| * state = getRuntimeContext().getAggregatingState( |
| * new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class)); |
| * } |
| * |
| * public Tuple2<MyType, Long> map(MyType value) { |
| * state.add(value); |
| * return new Tuple2<>(value, state.get()); |
| * } |
| * }); |
| * |
| * }</pre> |
| * |
| * @param stateProperties The descriptor defining the properties of the stats. |
| * |
| * @param <IN> The type of the values that are added to the state. |
| * @param <ACC> The type of the accumulator (intermediate aggregation state). |
| * @param <OUT> The type of the values that are returned from the state. |
| * |
| * @return The partitioned state object. |
| * |
| * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the |
| * function (function is not part of a KeyedStream). |
| */ |
| @PublicEvolving |
| <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties); |
| |
| /** |
| * Gets a handle to the system's key/value folding state. This state is similar to the state |
| * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that |
| * aggregates values with different types. |
| * |
| * <p>This state is only accessible if the function is executed on a KeyedStream. |
| * |
| * <pre>{@code |
| * DataStream<MyType> stream = ...; |
| * KeyedStream<MyType> keyedStream = stream.keyBy("id"); |
| * |
| * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { |
| * |
| * private FoldingState<MyType, Long> state; |
| * |
| * public void open(Configuration cfg) { |
| * state = getRuntimeContext().getReducingState( |
| * new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class)); |
| * } |
| * |
| * public Tuple2<MyType, Long> map(MyType value) { |
| * state.add(value); |
| * return new Tuple2<>(value, state.get()); |
| * } |
| * }); |
| * |
| * }</pre> |
| * |
| * @param stateProperties The descriptor defining the properties of the stats. |
| * |
| * @param <T> The type of value stored in the state. |
| * |
| * @return The partitioned state object. |
| * |
| * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the |
| * function (function is not part of a KeyedStream). |
| * |
| * @deprecated will be removed in a future version in favor of {@link AggregatingState} |
| */ |
| @PublicEvolving |
| @Deprecated |
| <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties); |
| |
| /** |
| * Gets a handle to the system's key/value map state. This state is similar to the state |
| * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that |
| * is composed of user-defined key-value pairs |
| * |
| * <p>This state is only accessible if the function is executed on a KeyedStream. |
| * |
| * <pre>{@code |
| * DataStream<MyType> stream = ...; |
| * KeyedStream<MyType> keyedStream = stream.keyBy("id"); |
| * |
| * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { |
| * |
| * private MapState<MyType, Long> state; |
| * |
| * public void open(Configuration cfg) { |
| * state = getRuntimeContext().getMapState( |
| * new MapStateDescriptor<>("sum", MyType.class, Long.class)); |
| * } |
| * |
| * public Tuple2<MyType, Long> map(MyType value) { |
| * return new Tuple2<>(value, state.get(value)); |
| * } |
| * }); |
| * |
| * }</pre> |
| * |
| * @param stateProperties The descriptor defining the properties of the stats. |
| * |
| * @param <UK> The type of the user keys stored in the state. |
| * @param <UV> The type of the user values stored in the state. |
| * |
| * @return The partitioned state object. |
| * |
| * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the |
| * function (function is not part of a KeyedStream). |
| */ |
| @PublicEvolving |
| <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); |
| |
| /** |
| * Gets a handle to the system's key/value sorted map state. This state is similar to the state |
| * accessed via {@link #getMapState(MapStateDescriptor)}, but the state is sorted under the given comparator. |
| * |
| * <p><b>IMPORTANT:</b> For RocksDBStateBackend, we only support {@link BytewiseComparator}. |
| * The serialized forms in {@link BytewiseComparator} are identical to that of the values |
| * only when the numbers to compare are both not negative. |
| * |
| * Serializers under {@link org.apache.flink.table.typeutils.ordered} maybe helpful if you want to use SortedMapState. |
| * </p> |
| * |
| * <p>This state is only accessible if the function is executed on a KeyedStream. |
| * |
| * <pre>{@code |
| * DataStream<MyType> stream = ...; |
| * KeyedStream<MyType> keyedStream = stream.keyBy("id"); |
| * |
| * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { |
| * |
| * private SortedMapState<MyType, Long> state; |
| * |
| * public void open(Configuration cfg) { |
| * state = getRuntimeContext().getSortedMapState( |
| * new SortedMapStateDescriptor<>("sum", new NaturalComparator<>(), MyType.class, Long.class)); |
| * } |
| * |
| * public Tuple2<MyType, Long> map(MyType value) { |
| * return new Tuple2<>(value, state.get(value)); |
| * } |
| * }); |
| * |
| * }</pre> |
| * |
| * @param stateProperties The descriptor defining the properties of the stats. |
| * |
| * @param <UK> The type of the user keys stored in the state. |
| * @param <UV> The type of the user values stored in the state. |
| * |
| * @return The partitioned state object. |
| * |
| * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the |
| * function (function is not part of a KeyedStream). |
| */ |
| @PublicEvolving |
| default <UK, UV> SortedMapState<UK, UV> getSortedMapState(SortedMapStateDescriptor<UK, UV> stateProperties) { |
| throw new UnsupportedOperationException("This state is only accessible by functions executed on a KeyedStream"); |
| } |
| } |