blob: 1f9d95f059227122d183eaa200fa268cd1879919 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.window.internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.table.api.window.Window;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.window.assigners.MergingWindowAssigner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* The implementation of {@link InternalWindowProcessFunction} for {@link MergingWindowAssigner}.
* @param <W> The type of {@code Window} that assigner assigns.
*/
public class MergingWindowProcessFunction<K, W extends Window>
extends InternalWindowProcessFunction<K, W> {
private static final long serialVersionUID = -2866771637946397223L;
private final MergingWindowAssigner<W> windowAssigner;
private final TypeSerializer<K> keySerializer;
private final TypeSerializer<W> windowSerializer;
private transient MergingWindowSet<K, W> mergingWindows;
private transient MergingFunctionImpl mergingFunction;
private List<W> reuseActualWindows;
public MergingWindowProcessFunction(
MergingWindowAssigner<W> windowAssigner,
SubKeyedAggsHandleFunction<W> windowAggregator,
TypeSerializer<K> keySerializer,
TypeSerializer<W> windowSerializer,
long allowedLateness) {
super(windowAssigner, windowAggregator, allowedLateness);
this.windowAssigner = windowAssigner;
this.keySerializer = keySerializer;
this.windowSerializer = windowSerializer;
}
@Override
public void open(Context<K, W> ctx) throws Exception {
super.open(ctx);
KeyedMapStateDescriptor<K, W, W> mappingStateDescriptor = new KeyedMapStateDescriptor<>(
"session-window-mapping",
keySerializer,
windowSerializer,
windowSerializer);
KeyedMapState<K, W, W> windowMapping = ctx.getKeyedState(mappingStateDescriptor);
this.mergingWindows = new MergingWindowSet<>(windowAssigner, windowMapping);
this.mergingFunction = new MergingFunctionImpl();
}
@Override
public Collection<W> assignStateNamespace(BaseRow inputRow, long timestamp) throws Exception {
Collection<W> elementWindows = windowAssigner.assignWindows(inputRow, timestamp);
mergingWindows.initializeCache(ctx.currentKey());
reuseActualWindows = new ArrayList<>(1);
for (W window : elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow = mergingWindows.addWindow(ctx.currentKey(), window, mergingFunction);
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(ctx.currentKey(), actualWindow);
} else {
reuseActualWindows.add(actualWindow);
}
}
List<W> affectedWindows = new ArrayList<>(reuseActualWindows.size());
for (W actual : reuseActualWindows) {
affectedWindows.add(mergingWindows.getStateWindow(ctx.currentKey(), actual));
}
return affectedWindows;
}
@Override
public Collection<W> assignActualWindows(BaseRow inputRow, long timestamp) throws Exception {
// the actual windows is calculated in assignStateNamespace
return reuseActualWindows;
}
@Override
public BaseRow getWindowAggregationResult(W window) throws Exception {
W stateWindow = mergingWindows.getStateWindow(ctx.currentKey(), window);
BaseRow acc = ctx.getWindowAccumulators(stateWindow);
if (acc == null) {
acc = windowAggregator.createAccumulators();
}
windowAggregator.setAccumulators(stateWindow, acc);
return windowAggregator.getValue(window);
}
@Override
public void cleanWindowIfNeeded(W window, long currentTime) throws Exception {
if (isCleanupTime(window, currentTime)) {
ctx.clearTrigger(window);
W stateWindow = mergingWindows.getStateWindow(ctx.currentKey(), window);
ctx.clearWindowState(stateWindow);
// retire expired window
mergingWindows.initializeCache(ctx.currentKey());
mergingWindows.retireWindow(ctx.currentKey(), window);
// do not need to clear previous state, previous state is disabled in session window
}
}
private class MergingFunctionImpl implements MergingWindowSet.MergeFunction<W> {
@Override
public void merge(
W mergeResult,
Collection<W> mergedWindows,
W stateWindowResult,
Collection<W> stateWindowsToBeMerged) throws Exception {
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= ctx.currentWatermark())) {
throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + ctx.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= ctx.currentProcessingTime()) {
throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + ctx.currentProcessingTime() +
" window: " + mergeResult);
}
ctx.onMerge(mergeResult, stateWindowsToBeMerged);
// clear registered timers
for (W m : mergedWindows) {
ctx.clearTrigger(m);
ctx.deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
if (!stateWindowsToBeMerged.isEmpty()) {
BaseRow targetAcc = ctx.getWindowAccumulators(stateWindowResult);
if (targetAcc == null) {
targetAcc = windowAggregator.createAccumulators();
}
windowAggregator.setAccumulators(stateWindowResult, targetAcc);
for (W w : stateWindowsToBeMerged) {
BaseRow acc = ctx.getWindowAccumulators(w);
if (acc != null) {
windowAggregator.merge(w, acc);
}
// clear merged window
ctx.clearWindowState(w);
ctx.clearPreviousState(w);
}
targetAcc = windowAggregator.getAccumulators();
ctx.setWindowAccumulators(stateWindowResult, targetAcc);
}
}
}
}