blob: 62d1002efae83a3b88c86c510165fcef47141bf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.window;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Merger;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.keyed.KeyedState;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueStateDescriptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.api.types.RowType;
import org.apache.flink.table.api.window.Window;
import org.apache.flink.table.codegen.GeneratedRecordEqualiser;
import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.sort.RecordEqualiser;
import org.apache.flink.table.runtime.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.PanedWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.window.internal.GeneralWindowProcessFunction;
import org.apache.flink.table.runtime.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.window.internal.MergingWindowProcessFunction;
import org.apache.flink.table.runtime.window.internal.PanedWindowProcessFunction;
import org.apache.flink.table.runtime.window.triggers.Trigger;
import org.apache.commons.lang3.ArrayUtils;
import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An operator that implements the logic for windowing based on a {@link WindowAssigner} and
* {@link Trigger}.
*
* <p>When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
* assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
* is put into panes. A pane is the bucket of elements that have the same key and same
* {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
* {@code WindowAssigner}.
*
* <p>Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
* the contents of the pane should be processed to emit results. When a trigger fires,
* the given {@link org.apache.flink.table.runtime.functions.AggsHandleFunction}
* is invoked to produce the results that are emitted for the pane to which the {@code Trigger} belongs.
*
* <p>The parameter types:
* {@code <IN>}: BaseRow
* {@code <OUT>}: JoinedRow(KEY, AGG_RESULT)
* {@code <KEY>}: GenericRow
* {@code <AGG_RESULT>}: GenericRow
* {@code <ACC>}: GenericRow
*
* @param <K> The type of key returned by the {@code KeySelector}.
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
*/
public class WindowOperator<K, W extends Window>
extends AbstractStreamOperator<BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow>, Triggerable<K, W> {
private static final long serialVersionUID = 1L;
private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
// ------------------------------------------------------------------------
// Configuration values and user functions
// ------------------------------------------------------------------------
private final WindowAssigner<W> windowAssigner;
private final Trigger<W> trigger;
/**
* For serializing the window in checkpoints.
*/
private final TypeSerializer<W> windowSerializer;
/** For serializing the key in checkpoints. */
private TypeSerializer<K> keySerializer;
private final boolean sendRetraction;
private final InternalType[] inputFieldTypes;
private final InternalType[] accumulatorTypes;
private final InternalType[] aggResultTypes;
private final InternalType[] windowPropertyTypes;
private final int rowtimeIndex;
/**
* The allowed lateness for elements. This is used for:
* <ul>
* <li>Deciding if an element should be dropped from a window due to lateness.
* <li>Clearing the state of a window if the system time passes the
* {@code window.maxTimestamp + allowedLateness} landmark.
* </ul>
*/
private final long allowedLateness;
// --------------------------------------------------------------------------------
private SubKeyedAggsHandleFunction<W> windowAggregator;
private GeneratedSubKeyedAggsHandleFunction<W> generatedWindowAggregator;
/** The util to compare two BaseRow equals to each other.
* As different BaseRow can't be equals directly, we use a code generated util to handle this.
*/
private RecordEqualiser equaliser;
private GeneratedRecordEqualiser generatedEqualiser;
// --------------------------------------------------------------------------------
private transient InternalWindowProcessFunction<K, W> windowFunction;
/**
* This is given to the {@code InternalWindowFunction} for emitting elements with a given
* timestamp.
*/
private transient TimestampedCollector<BaseRow> collector;
/** Flag to prevent duplicate function.close() calls in close() and dispose(). */
private transient boolean functionsClosed = false;
private transient Counter numLateRecordsDropped;
private transient Meter lateRecordsDroppedRate;
private transient Gauge<Long> watermarkLatency;
private transient InternalTimerService<W> internalTimerService;
private transient SubKeyedValueState<K, W, BaseRow> windowState;
private transient SubKeyedValueState<K, W, BaseRow> previousState;
private transient TriggerContext triggerContext;
private transient JoinedRow reuseOutput;
WindowOperator(
SubKeyedAggsHandleFunction<W> windowAggregator,
RecordEqualiser equaliser,
WindowAssigner<W> windowAssigner,
Trigger<W> trigger,
TypeSerializer<W> windowSerializer,
InternalType[] inputFieldTypes,
InternalType[] accumulatorTypes,
InternalType[] aggResultTypes,
InternalType[] windowPropertyTypes,
int rowtimeIndex,
boolean sendRetraction,
long allowedLateness) {
checkArgument(allowedLateness >= 0);
this.windowAggregator = checkNotNull(windowAggregator);
this.equaliser = checkNotNull(equaliser);
this.windowAssigner = checkNotNull(windowAssigner);
this.trigger = checkNotNull(trigger);
this.windowSerializer = checkNotNull(windowSerializer);
this.inputFieldTypes = checkNotNull(inputFieldTypes);
this.accumulatorTypes = checkNotNull(accumulatorTypes);
this.aggResultTypes = checkNotNull(aggResultTypes);
this.windowPropertyTypes = checkNotNull(windowPropertyTypes);
this.allowedLateness = allowedLateness;
this.sendRetraction = sendRetraction;
// rowtime index should >= 0 when in event time mode
checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
this.rowtimeIndex = rowtimeIndex;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
WindowOperator(
GeneratedSubKeyedAggsHandleFunction<W> generatedWindowAggregator,
GeneratedRecordEqualiser generatedEqualiser,
WindowAssigner<W> windowAssigner,
Trigger<W> trigger,
TypeSerializer<W> windowSerializer,
InternalType[] inputFieldTypes,
InternalType[] accumulatorTypes,
InternalType[] aggResultTypes,
InternalType[] windowPropertyTypes,
int rowtimeIndex,
boolean sendRetraction,
long allowedLateness) {
checkArgument(allowedLateness >= 0);
this.generatedWindowAggregator = checkNotNull(generatedWindowAggregator);
this.generatedEqualiser = checkNotNull(generatedEqualiser);
this.windowAssigner = checkNotNull(windowAssigner);
this.trigger = checkNotNull(trigger);
this.windowSerializer = checkNotNull(windowSerializer);
this.inputFieldTypes = checkNotNull(inputFieldTypes);
this.accumulatorTypes = checkNotNull(accumulatorTypes);
this.aggResultTypes = checkNotNull(aggResultTypes);
this.windowPropertyTypes = checkNotNull(windowPropertyTypes);
this.allowedLateness = allowedLateness;
this.sendRetraction = sendRetraction;
// rowtime index should >= 0 when in event time mode
checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
this.rowtimeIndex = rowtimeIndex;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
public void open() throws Exception {
super.open();
//noinspection unchecked
this.keySerializer = (TypeSerializer<K>) getKeySerializer();
collector = new TimestampedCollector<>(output);
collector.eraseTimestamp();
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
triggerContext = new TriggerContext();
triggerContext.open();
RowType accTypeInfo = new RowType(accumulatorTypes);
SubKeyedValueStateDescriptor<K, W, BaseRow> windowStateDescriptor = new SubKeyedValueStateDescriptor<>(
"window-aggs",
keySerializer,
windowSerializer,
(TypeSerializer<BaseRow>) DataTypes.createInternalSerializer(accTypeInfo));
this.windowState = getSubKeyedState(windowStateDescriptor);
if (sendRetraction) {
InternalType[] valueTypes = ArrayUtils.addAll(aggResultTypes, windowPropertyTypes);
RowType prevAggResultType = new RowType(valueTypes);
SubKeyedValueStateDescriptor<K, W, BaseRow> previousStateDescriptor = new SubKeyedValueStateDescriptor<>(
"previous-aggs",
keySerializer,
windowSerializer,
(TypeSerializer<BaseRow>) DataTypes.createInternalSerializer(prevAggResultType));
this.previousState = getSubKeyedState(previousStateDescriptor);
}
// compile aggregator
if (generatedWindowAggregator != null) {
// the type cast is needed here, otherwise compile will complain
this.windowAggregator = (SubKeyedAggsHandleFunction<W>) generatedWindowAggregator.newInstance(
getRuntimeContext().getUserCodeClassLoader());
}
// compile equaliser
if (generatedEqualiser != null) {
// the type cast is needed here, otherwise compile will complain
this.equaliser = (RecordEqualiser) generatedEqualiser.newInstance(
getRuntimeContext().getUserCodeClassLoader());
}
WindowContext windowContext = new WindowContext();
windowAggregator.open(new ExecutionContextImpl(this, getRuntimeContext(), windowSerializer));
if (windowAssigner instanceof MergingWindowAssigner) {
this.windowFunction = new MergingWindowProcessFunction<>(
(MergingWindowAssigner<W>) windowAssigner,
windowAggregator,
keySerializer,
windowSerializer,
allowedLateness);
} else if (windowAssigner instanceof PanedWindowAssigner) {
this.windowFunction = new PanedWindowProcessFunction<>(
(PanedWindowAssigner<W>) windowAssigner,
windowAggregator,
allowedLateness);
} else {
this.windowFunction = new GeneralWindowProcessFunction<>(
windowAssigner,
windowAggregator,
allowedLateness);
}
windowFunction.open(windowContext);
reuseOutput = new JoinedRow();
// metrics
this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
this.lateRecordsDroppedRate = metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
this.watermarkLatency = metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
long watermark = internalTimerService.currentWatermark();
if (watermark < 0) {
return 0L;
} else {
return internalTimerService.currentProcessingTime() - watermark;
}
});
}
@Override
public void close() throws Exception {
super.close();
collector = null;
triggerContext = null;
functionsClosed = true;
windowAggregator.close();
}
@Override
public void dispose() throws Exception {
super.dispose();
collector = null;
triggerContext = null;
if (!functionsClosed) {
functionsClosed = true;
windowAggregator.close();
}
}
@Override
public void processElement(StreamRecord<BaseRow> record) throws Exception {
// prepare inputRow and timestamp
GenericRow inputRow = BaseRowUtil.toGenericRow(record.getValue(), inputFieldTypes);
long timestamp;
if (windowAssigner.isEventTime()) {
timestamp = inputRow.getLong(rowtimeIndex);
} else {
timestamp = internalTimerService.currentProcessingTime();
}
// the windows which the input row should be placed into
Collection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);
boolean isElementDropped = true;
for (W window : affectedWindows) {
isElementDropped = false;
BaseRow acc = windowState.get(currentKey(), window);
if (acc == null) {
acc = windowAggregator.createAccumulators();
}
windowAggregator.setAccumulators(window, acc);
if (BaseRowUtil.isAccumulateMsg(inputRow)) {
windowAggregator.accumulate(inputRow);
} else {
windowAggregator.retract(inputRow);
}
acc = windowAggregator.getAccumulators();
windowState.put(currentKey(), window, acc);
}
// the actual window which the input row is belongs to
Collection<W> actualWindows = windowFunction.assignActualWindows(inputRow, timestamp);
for (W window : actualWindows) {
isElementDropped = false;
triggerContext.window = window;
boolean triggerResult = triggerContext.onElement(inputRow, timestamp);
if (triggerResult) {
emitWindowResult(window);
}
// register a clean up timer for the window
registerCleanupTimer(window);
}
if (isElementDropped) {
numLateRecordsDropped.inc();
lateRecordsDroppedRate.markEvent();
}
}
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
setCurrentKey(timer.getKey());
triggerContext.window = timer.getNamespace();
if (triggerContext.onEventTime(timer.getTimestamp())) {
// fire
emitWindowResult(triggerContext.window);
}
if (windowAssigner.isEventTime()) {
windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());
}
}
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
setCurrentKey(timer.getKey());
triggerContext.window = timer.getNamespace();
if (triggerContext.onProcessingTime(timer.getTimestamp())) {
// fire
emitWindowResult(triggerContext.window);
}
if (!windowAssigner.isEventTime()) {
windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());
}
}
/**
* Emits the window result of the given window.
*/
private void emitWindowResult(W window) throws Exception {
BaseRow aggResult = windowFunction.getWindowAggregationResult(window);
if (sendRetraction) {
BaseRow previousAggResult = previousState.get(currentKey(), window);
// has emitted result for the window
if (previousAggResult != null) {
// current agg is not equal to the previous emitted, should emit retract
if (!equaliser.equalsWithoutHeader(aggResult, previousAggResult)) {
reuseOutput.replace((BaseRow) getCurrentKey(), previousAggResult);
BaseRowUtil.setRetract(reuseOutput);
// send retraction
collector.collect(reuseOutput);
// send accumulate
reuseOutput.replace((BaseRow) getCurrentKey(), aggResult);
BaseRowUtil.setAccumulate(reuseOutput);
collector.collect(reuseOutput);
// update previousState
previousState.put(currentKey(), window, aggResult);
}
// if the previous agg equals to the current agg, no need to send retract and accumulate
}
// the first fire for the window, only send accumulate
else {
// send accumulate
reuseOutput.replace((BaseRow) getCurrentKey(), aggResult);
BaseRowUtil.setAccumulate(reuseOutput);
collector.collect(reuseOutput);
// update previousState
previousState.put(currentKey(), window, aggResult);
}
} else {
reuseOutput.replace((BaseRow) getCurrentKey(), aggResult);
// no need to set header
collector.collect(reuseOutput);
}
}
@Override
public void endInput() throws Exception {
// nothing to do
}
/**
* Registers a timer to cleanup the content of the window.
*
* @param window the window whose state to discard
*/
private void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
if (windowAssigner.isEventTime()) {
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
/**
* Returns the cleanup time for a window, which is
* {@code window.maxTimestamp + allowedLateness}. In
* case this leads to a value greated than {@link Long#MAX_VALUE}
* then a cleanup time of {@link Long#MAX_VALUE} is
* returned.
*
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = Math.max(0, window.maxTimestamp() + allowedLateness);
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return Math.max(0, window.maxTimestamp());
}
}
@SuppressWarnings("unchecked")
private K currentKey() {
return (K) getCurrentKey();
}
// ------------------------------------------------------------------------------
/**
* Context of window.
*/
private class WindowContext implements InternalWindowProcessFunction.Context<K, W> {
@Override
public <S extends State> S getKeyedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
return WindowOperator.this.getPartitionedState(stateDescriptor);
}
@Override
public <V, S extends SubKeyedState<K, W, V>> S getSubKeyedState(SubKeyedStateDescriptor<K, W, V, S> descriptor) throws Exception {
return WindowOperator.this.getSubKeyedState(descriptor);
}
@Override
public <V, S extends KeyedState<K, V>> S getKeyedState(KeyedStateDescriptor<K, V, S> descriptor) throws Exception {
return WindowOperator.this.getKeyedState(descriptor);
}
@Override
public K currentKey() {
return WindowOperator.this.currentKey();
}
@Override
public long currentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
@Override
public long currentWatermark() {
return internalTimerService.currentWatermark();
}
@Override
public BaseRow getWindowAccumulators(W window) throws Exception {
return windowState.get(currentKey(), window);
}
@Override
public void setWindowAccumulators(W window, BaseRow acc) throws Exception {
windowState.put(currentKey(), window, acc);
}
@Override
public void clearWindowState(W window) throws Exception {
windowState.remove(currentKey(), window);
windowAggregator.cleanup(window);
}
@Override
public void clearPreviousState(W window) throws Exception {
if (previousState != null) {
previousState.remove(currentKey(), window);
}
}
@Override
public void clearTrigger(W window) throws Exception {
triggerContext.window = window;
triggerContext.clear();
}
@Override
public void deleteCleanupTimer(W window) throws Exception {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// no need to clean up because we didn't set one
return;
}
if (windowAssigner.isEventTime()) {
triggerContext.deleteEventTimeTimer(cleanupTime);
} else {
triggerContext.deleteProcessingTimeTimer(cleanupTime);
}
}
@Override
public void onMerge(W newWindow, Collection<W> mergedWindows) throws Exception {
triggerContext.window = newWindow;
triggerContext.mergedWindows = mergedWindows;
triggerContext.onMerge();
}
}
/**
* {@code TriggerContext} is a utility for handling {@code Trigger} invocations. It can be reused
* by setting the {@code key} and {@code window} fields. No internal state must be kept in
* the {@code TriggerContext}
*/
private class TriggerContext implements Trigger.OnMergeContext {
protected W window;
protected Collection<W> mergedWindows;
public void open() throws Exception {
trigger.open(this);
}
public boolean onElement(BaseRow row, long timestamp) throws Exception {
return trigger.onElement(row, timestamp, window);
}
public boolean onProcessingTime(long time) throws Exception {
return trigger.onProcessingTime(time, window);
}
public boolean onEventTime(long time) throws Exception {
return trigger.onEventTime(time, window);
}
public void onMerge() throws Exception {
trigger.onMerge(window, this);
}
@Override
public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor) throws Exception {
stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
SubKeyedValueStateDescriptor<K, W, T> descriptor = new SubKeyedValueStateDescriptor<>(
stateDescriptor.getName(),
keySerializer,
windowSerializer,
stateDescriptor.getSerializer());
SubKeyedValueState<K, W, T> valueState = WindowOperator.this.getSubKeyedState(descriptor);
return new WindowScopeValueState<>(valueState);
}
@Override
public <T> T mergeValueState(ValueState<T> state, Merger<T> merger) {
SubKeyedValueState<K, W, T> keyedState = ((WindowScopeValueState<T>) state).keyedState;
if (mergedWindows != null && mergedWindows.size() > 0) {
T targetValue = keyedState.get(currentKey(), window);
for (W mergedWindow : mergedWindows) {
T value = keyedState.get(currentKey(), mergedWindow);
targetValue = merger.merge(targetValue, value);
}
keyedState.put(currentKey(), window, targetValue);
return targetValue;
}
return null;
}
@Override
public long getCurrentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
@Override
public long getCurrentWatermark() {
return internalTimerService.currentWatermark();
}
@Override
public MetricGroup getMetricGroup() {
return WindowOperator.this.getMetricGroup();
}
@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(window, time);
}
@Override
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(window, time);
}
@Override
public void deleteProcessingTimeTimer(long time) {
internalTimerService.deleteProcessingTimeTimer(window, time);
}
@Override
public void deleteEventTimeTimer(long time) {
internalTimerService.deleteEventTimeTimer(window, time);
}
public void clear() throws Exception {
trigger.clear(window);
}
}
// ------------------------------------------------------------------------------
// Window Scope State
// ------------------------------------------------------------------------------
private class WindowScopeValueState<T> implements ValueState<T> {
// internal state
private final SubKeyedValueState<K, W, T> keyedState;
private WindowScopeValueState(SubKeyedValueState<K, W, T> keyedState) {
this.keyedState = keyedState;
}
@Override
public T value() {
return keyedState.get(currentKey(), triggerContext.window);
}
@Override
public void update(T value) {
keyedState.put(currentKey(), triggerContext.window, value);
}
@Override
public void clear() {
keyedState.remove(currentKey(), triggerContext.window);
}
}
// ------------------------------------------------------------------------------
// Visible For Testing
// ------------------------------------------------------------------------------
protected Counter getNumLateRecordsDropped() {
return numLateRecordsDropped;
}
protected Gauge<Long> getWatermarkLatency() {
return watermarkLatency;
}
@Override
public boolean requireState() {
return true;
}
}