blob: 87ca53d7c835c2c4cab0f18417ee9a506446af8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.SortedMapState;
import org.apache.flink.api.common.state.SortedMapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.BytewiseComparator;
import org.apache.flink.metrics.MetricGroup;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
* of the function will have a context through which it can access static contextual information (such as
* the current parallelism) and other constructs like accumulators and broadcast variables.
* <p>
* A function can, during runtime, obtain the RuntimeContext via a call to
* {@link AbstractRichFunction#getRuntimeContext()}.
*/
@Public
public interface RuntimeContext {
/**
* Returns the name of the task in which the UDF runs, as assigned during plan construction.
*
* @return The name of the task in which the UDF runs.
*/
String getTaskName();
/**
* Returns the metric group for this parallel subtask.
*
* @return The metric group for this parallel subtask.
*/
@PublicEvolving
MetricGroup getMetricGroup();
/**
* Gets the parallelism with which the parallel task runs.
*
* @return The parallelism with which the parallel task runs.
*/
int getNumberOfParallelSubtasks();
/**
* Gets the number of max-parallelism with which the parallel task runs.
*
* @return The max-parallelism with which the parallel task runs.
*/
@PublicEvolving
int getMaxNumberOfParallelSubtasks();
/**
* Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
* parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
*
* @return The index of the parallel subtask.
*/
int getIndexOfThisSubtask();
/**
* Gets the attempt number of this parallel subtask. First attempt is numbered 0.
*
* @return Attempt number of the subtask.
*/
int getAttemptNumber();
/**
* Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
* where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
* {@link #getNumberOfParallelSubtasks()}.
*
* @return The name of the task, with subtask indicator.
*/
String getTaskNameWithSubtasks();
/**
* Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
* job.
*/
ExecutionConfig getExecutionConfig();
/**
* Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the
* jar file of a user job.
*
* @return The ClassLoader for user code classes.
*/
ClassLoader getUserCodeClassLoader();
// --------------------------------------------------------------------------------------------
/**
* Add this accumulator. Throws an exception if the accumulator already exists in the same Task.
* Note that the Accumulator name must have an unique name across the Flink job. Otherwise you will
* get an error when incompatible accumulators from different Tasks are combined at the JobManager
* upon job completion.
*/
<V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator);
/**
* Get an existing accumulator object. The accumulator must have been added
* previously in this local runtime context.
*
* Throws an exception if the accumulator does not exist or if the
* accumulator exists, but with different type.
*/
<V, A extends Serializable> Accumulator<V, A> getAccumulator(String name);
/**
* Returns a map of all registered accumulators for this task.
* The returned map must not be modified.
* @deprecated Use getAccumulator(..) to obtain the value of an accumulator.
*/
@Deprecated
@PublicEvolving
Map<String, Accumulator<?, ?>> getAllAccumulators();
/**
* Pre-aggregated accumulator is a special kind of accumulators. It is committed by each task
* of a single job vertex and then aggregated to a single value. Only the aggregated value is
* stored instead of the values from each task, and this process is different from the common
* accumulators.
*
* <p>Pre-aggregated accumulator is different from the Accumulator in that:
* <ul>
* <li> The Pre-aggregated accumulator is subjected to the tasks of a single JobVertex, and all the tasks
* should commit exactly one partial value.</li>
* <li> The Pre-aggregated accumulator does not guarantee successful aggregation. The user codes should function
* normally without the accumulator.</li>
* <li> The Pre-aggregated accumulator guarantees only AT_LEAST_ONCE semantics.</li>
* </ul>
*/
@PublicEvolving
<V, A extends Serializable> void addPreAggregatedAccumulator(String name, Accumulator<V, A> accumulator);
/**
* Gets an uncommitted pre-aggregated accumulator.
*/
@PublicEvolving
<V, A extends Serializable> Accumulator<V, A> getPreAggregatedAccumulator(String name);
/**
* Commits a pre-aggregated accumulator and remove it from the registered map.
*/
@PublicEvolving
void commitPreAggregatedAccumulator(String name);
/**
* Queries a pre-aggregated accumulator asynchronously.
*/
@PublicEvolving
<V, A extends Serializable> CompletableFuture<Accumulator<V, A>> queryPreAggregatedAccumulator(String name);
/**
* Convenience function to create a counter object for integers.
*/
@PublicEvolving
IntCounter getIntCounter(String name);
/**
* Convenience function to create a counter object for longs.
*/
@PublicEvolving
LongCounter getLongCounter(String name);
/**
* Convenience function to create a counter object for doubles.
*/
@PublicEvolving
DoubleCounter getDoubleCounter(String name);
/**
* Convenience function to create a counter object for histograms.
*/
@PublicEvolving
Histogram getHistogram(String name);
// --------------------------------------------------------------------------------------------
/**
* Tests for the existence of the broadcast variable identified by the
* given {@code name}.
*
* @param name The name under which the broadcast variable is registered;
* @return Whether a broadcast variable exists for the given name.
*/
@PublicEvolving
boolean hasBroadcastVariable(String name);
/**
* Returns the result bound to the broadcast variable identified by the
* given {@code name}.
* <p>
* IMPORTANT: The broadcast variable data structure is shared between the parallel
* tasks on one machine. Any access that modifies its internal state needs to
* be manually synchronized by the caller.
*
* @param name The name under which the broadcast variable is registered;
* @return The broadcast variable, materialized as a list of elements.
*/
<RT> List<RT> getBroadcastVariable(String name);
/**
* Returns the result bound to the broadcast variable identified by the
* given {@code name}. The broadcast variable is returned as a shared data structure
* that is initialized with the given {@link BroadcastVariableInitializer}.
* <p>
* IMPORTANT: The broadcast variable data structure is shared between the parallel
* tasks on one machine. Any access that modifies its internal state needs to
* be manually synchronized by the caller.
*
* @param name The name under which the broadcast variable is registered;
* @param initializer The initializer that creates the shared data structure of the broadcast
* variable from the sequence of elements.
* @return The broadcast variable, materialized as a list of elements.
*/
<T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer);
/**
* Returns the {@link DistributedCache} to get the local temporary file copies of files otherwise not
* locally accessible.
*
* @return The distributed cache of the worker executing this instance.
*/
DistributedCache getDistributedCache();
// ------------------------------------------------------------------------
// Methods for accessing state
// ------------------------------------------------------------------------
/**
* Gets a handle to the system's key/value state. The key/value state is only accessible
* if the function is executed on a KeyedStream. On each access, the state exposes the value
* for the key of the element currently processed by the function.
* Each function may have multiple partitioned states, addressed with different names.
*
* <p>Because the scope of each value is the key of the currently processed element,
* and the elements are distributed by the Flink runtime, the system can transparently
* scale out and redistribute the state and KeyedStream.
*
* <p>The following code example shows how to implement a continuous counter that counts
* how many times elements of a certain key occur, and emits an updated count for that
* element on each occurrence.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
*
* private ValueState<Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getState(
* new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* long count = state.value() + 1;
* state.update(value);
* return new Tuple2<>(value, count);
* }
* });
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);
/**
* Gets a handle to the system's key/value list state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* holds lists. One can add elements to the list, or retrieve the list as a whole.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() {
*
* private ListState<MyType> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getListState(
* new ListStateDescriptor<>("myState", MyType.class));
* }
*
* public void flatMap(MyType value, Collector<MyType> out) {
* if (value.isDivider()) {
* for (MyType t : state.get()) {
* out.collect(t);
* }
* } else {
* state.add(value);
* }
* }
* });
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part os a KeyedStream).
*/
@PublicEvolving
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
/**
* Gets a handle to the system's key/value reducing state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private ReducingState<Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getReducingState(
* new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value.count());
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> The type of value stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
/**
* Gets a handle to the system's key/value aggregating state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values with different types.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
* AggregateFunction<...> aggregateFunction = ...
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private AggregatingState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getAggregatingState(
* new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value);
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <IN> The type of the values that are added to the state.
* @param <ACC> The type of the accumulator (intermediate aggregation state).
* @param <OUT> The type of the values that are returned from the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
/**
* Gets a handle to the system's key/value folding state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values with different types.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private FoldingState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getFoldingState(
* new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value);
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> Type of the values folded in the other state
* @param <ACC> Type of the value in the state
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*
* @deprecated will be removed in a future version in favor of {@link AggregatingState}
*/
@PublicEvolving
@Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
/**
* Gets a handle to the system's key/value map state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* is composed of user-defined key-value pairs
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private MapState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getMapState(
* new MapStateDescriptor<>("sum", MyType.class, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* return new Tuple2<>(value, state.get(value));
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <UK> The type of the user keys stored in the state.
* @param <UV> The type of the user values stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
/**
* Gets a handle to the system's key/value sorted map state. This state is similar to the state
* accessed via {@link #getMapState(MapStateDescriptor)}, but the state is sorted under the given comparator.
*
* <p><b>IMPORTANT:</b> For RocksDBStateBackend, we only support {@link BytewiseComparator}.
* The serialized forms in {@link BytewiseComparator} are identical to that of the values
* only when the numbers to compare are both not negative.
*
* Serializers under {@link org.apache.flink.table.typeutils.ordered} maybe helpful if you want to use SortedMapState.
* </p>
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private SortedMapState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getSortedMapState(
* new SortedMapStateDescriptor<>("sum", new NaturalComparator<>(), MyType.class, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* return new Tuple2<>(value, state.get(value));
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <UK> The type of the user keys stored in the state.
* @param <UV> The type of the user values stored in the state.
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@PublicEvolving
default <UK, UV> SortedMapState<UK, UV> getSortedMapState(SortedMapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("This state is only accessible by functions executed on a KeyedStream");
}
}