| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.state.api; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.api.common.functions.AggregateFunction; |
| import org.apache.flink.api.common.functions.ReduceFunction; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.java.DataSet; |
| import org.apache.flink.api.java.ExecutionEnvironment; |
| import org.apache.flink.runtime.state.StateBackend; |
| import org.apache.flink.state.api.functions.WindowReaderFunction; |
| import org.apache.flink.state.api.input.KeyedStateInputFormat; |
| import org.apache.flink.state.api.input.operator.WindowReaderOperator; |
| import org.apache.flink.state.api.input.operator.window.AggregateEvictingWindowReaderFunction; |
| import org.apache.flink.state.api.input.operator.window.PassThroughReader; |
| import org.apache.flink.state.api.input.operator.window.ProcessEvictingWindowReader; |
| import org.apache.flink.state.api.input.operator.window.ReduceEvictingWindowReaderFunction; |
| import org.apache.flink.state.api.runtime.metadata.SavepointMetadata; |
| import org.apache.flink.streaming.api.windowing.windows.Window; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; |
| import org.apache.flink.util.Preconditions; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.IOException; |
| |
| /** |
| * This class provides entry points for reading keyed state written out using the {@code |
| * WindowOperator}. |
| * |
| * @param <W> The type of {@code Window}. |
| */ |
| @PublicEvolving |
| @Deprecated |
| public class EvictingWindowReader<W extends Window> { |
| |
| /** The batch execution environment. Used for creating inputs for reading state. */ |
| private final ExecutionEnvironment env; |
| |
| /** |
| * The savepoint metadata, which maintains the current set of existing / newly added operator |
| * states. |
| */ |
| private final SavepointMetadata metadata; |
| |
| /** |
| * The state backend that was previously used to write existing operator states in this |
| * savepoint. This is also the state backend that will be used when writing again this existing |
| * savepoint. |
| */ |
| @Nullable private final StateBackend stateBackend; |
| |
| /** The window serializer used to write the window operator. */ |
| private final TypeSerializer<W> windowSerializer; |
| |
| EvictingWindowReader( |
| ExecutionEnvironment env, |
| SavepointMetadata metadata, |
| @Nullable StateBackend stateBackend, |
| TypeSerializer<W> windowSerializer) { |
| Preconditions.checkNotNull(env, "The execution environment must not be null"); |
| Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null"); |
| Preconditions.checkNotNull(windowSerializer, "The window serializer must not be null"); |
| |
| this.env = env; |
| this.metadata = metadata; |
| this.stateBackend = stateBackend; |
| this.windowSerializer = windowSerializer; |
| } |
| |
| /** |
| * Reads window state generated using a {@link ReduceFunction}. |
| * |
| * @param uid The uid of the operator. |
| * @param function The reduce function used to create the window. |
| * @param keyType The key type of the window. |
| * @param reduceType The type information of the reduce function. |
| * @param <T> The type of the reduce function. |
| * @param <K> The key type of the operator. |
| * @return A {@code DataSet} of objects read from keyed state. |
| * @throws IOException If savepoint does not contain the specified uid. |
| */ |
| public <T, K> DataSet<T> reduce( |
| String uid, |
| ReduceFunction<T> function, |
| TypeInformation<K> keyType, |
| TypeInformation<T> reduceType) |
| throws IOException { |
| |
| return reduce(uid, function, new PassThroughReader<>(), keyType, reduceType, reduceType); |
| } |
| |
| /** |
| * Reads window state generated using a {@link ReduceFunction}. |
| * |
| * @param uid The uid of the operator. |
| * @param function The reduce function used to create the window. |
| * @param readerFunction The window reader function. |
| * @param keyType The key type of the window. |
| * @param reduceType The type information of the reduce function. |
| * @param outputType The output type of the reader function. |
| * @param <K> The type of the key. |
| * @param <T> The type of the reduce function. |
| * @param <OUT> The output type of the reduce function. |
| * @return A {@code DataSet} of objects read from keyed state. |
| * @throws IOException If savepoint does not contain the specified uid. |
| */ |
| public <K, T, OUT> DataSet<OUT> reduce( |
| String uid, |
| ReduceFunction<T> function, |
| WindowReaderFunction<T, OUT, K, W> readerFunction, |
| TypeInformation<K> keyType, |
| TypeInformation<T> reduceType, |
| TypeInformation<OUT> outputType) |
| throws IOException { |
| |
| WindowReaderOperator<?, K, StreamRecord<T>, W, OUT> operator = |
| WindowReaderOperator.evictingWindow( |
| new ReduceEvictingWindowReaderFunction<>(readerFunction, function), |
| keyType, |
| windowSerializer, |
| reduceType, |
| env.getConfig()); |
| |
| return readWindowOperator(uid, outputType, operator); |
| } |
| |
| /** |
| * Reads window state generated using an {@link AggregateFunction}. |
| * |
| * @param uid The uid of the operator. |
| * @param aggregateFunction The aggregate function used to create the window. |
| * @param keyType The key type of the window. |
| * @param inputType The type information of the accumulator function. |
| * @param outputType The output type of the reader function. |
| * @param <K> The type of the key. |
| * @param <T> The type of the values that are aggregated. |
| * @param <ACC> The type of the accumulator (intermediate aggregate state). |
| * @param <R> The type of the aggregated result. |
| * @return A {@code DataSet} of objects read from keyed state. |
| * @throws IOException If savepoint does not contain the specified uid. |
| */ |
| public <K, T, ACC, R> DataSet<R> aggregate( |
| String uid, |
| AggregateFunction<T, ACC, R> aggregateFunction, |
| TypeInformation<K> keyType, |
| TypeInformation<T> inputType, |
| TypeInformation<R> outputType) |
| throws IOException { |
| |
| return aggregate( |
| uid, aggregateFunction, new PassThroughReader<>(), keyType, inputType, outputType); |
| } |
| |
| /** |
| * Reads window state generated using an {@link AggregateFunction}. |
| * |
| * @param uid The uid of the operator. |
| * @param aggregateFunction The aggregate function used to create the window. |
| * @param readerFunction The window reader function. |
| * @param keyType The key type of the window. |
| * @param inputType The type information of the accumulator function. |
| * @param outputType The output type of the reader function. |
| * @param <K> The type of the key. |
| * @param <T> The type of the values that are aggregated. |
| * @param <ACC> The type of the accumulator (intermediate aggregate state). |
| * @param <R> The type of the aggregated result. |
| * @param <OUT> The output type of the reader function. |
| * @return A {@code DataSet} of objects read from keyed state. |
| * @throws IOException If savepoint does not contain the specified uid. |
| */ |
| public <K, T, ACC, R, OUT> DataSet<OUT> aggregate( |
| String uid, |
| AggregateFunction<T, ACC, R> aggregateFunction, |
| WindowReaderFunction<R, OUT, K, W> readerFunction, |
| TypeInformation<K> keyType, |
| TypeInformation<T> inputType, |
| TypeInformation<OUT> outputType) |
| throws IOException { |
| |
| WindowReaderOperator<?, K, StreamRecord<T>, W, OUT> operator = |
| WindowReaderOperator.evictingWindow( |
| new AggregateEvictingWindowReaderFunction<>( |
| readerFunction, aggregateFunction), |
| keyType, |
| windowSerializer, |
| inputType, |
| env.getConfig()); |
| |
| return readWindowOperator(uid, outputType, operator); |
| } |
| |
| /** |
| * Reads window state generated without any preaggregation such as {@code WindowedStream#apply} |
| * and {@code WindowedStream#process}. |
| * |
| * @param uid The uid of the operator. |
| * @param readerFunction The window reader function. |
| * @param keyType The key type of the window. |
| * @param stateType The type of records stored in state. |
| * @param outputType The output type of the reader function. |
| * @param <K> The type of the key. |
| * @param <T> The type of the records stored in state. |
| * @param <OUT> The output type of the reader function. |
| * @return A {@code DataSet} of objects read from keyed state. |
| * @throws IOException If the savepoint does not contain the specified uid. |
| */ |
| public <K, T, OUT> DataSet<OUT> process( |
| String uid, |
| WindowReaderFunction<T, OUT, K, W> readerFunction, |
| TypeInformation<K> keyType, |
| TypeInformation<T> stateType, |
| TypeInformation<OUT> outputType) |
| throws IOException { |
| |
| WindowReaderOperator<?, K, StreamRecord<T>, W, OUT> operator = |
| WindowReaderOperator.evictingWindow( |
| new ProcessEvictingWindowReader<>(readerFunction), |
| keyType, |
| windowSerializer, |
| stateType, |
| env.getConfig()); |
| |
| return readWindowOperator(uid, outputType, operator); |
| } |
| |
| private <K, T, OUT> DataSet<OUT> readWindowOperator( |
| String uid, |
| TypeInformation<OUT> outputType, |
| WindowReaderOperator<?, K, T, W, OUT> operator) |
| throws IOException { |
| KeyedStateInputFormat<K, W, OUT> format = |
| new KeyedStateInputFormat<>( |
| metadata.getOperatorState(uid), |
| stateBackend, |
| env.getConfiguration(), |
| operator); |
| |
| return env.createInput(format, outputType); |
| } |
| } |