blob: 9d061cce66e797ffea7146d798b27972a4ffd3da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.window.grouping;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.functions.DateTimeFunctions;
import org.apache.flink.table.runtime.util.RowIterator;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* Assigning windows from the sorted input buffers.
*/
public abstract class AbstractWindowsGrouping {
private final long windowStartOffset;
private final long windowSize;
private final long slideSize;
private final int timeIndex;
private long watermark;
private TimeWindow nextWindow;
private TimeWindow currentWindow;
private int triggerWindowStartIndex;
private boolean emptyWindowTriggered;
private boolean isDate;
AbstractWindowsGrouping(long windowSize, long slideSize, int timestampIndex, boolean isDate) {
this(0L, windowSize, slideSize, timestampIndex, isDate);
}
AbstractWindowsGrouping(
long offset, long windowSize, long slideSize, int timeIndex, boolean isDate) {
this.windowStartOffset = offset;
this.windowSize = windowSize;
this.slideSize = slideSize;
this.timeIndex = timeIndex;
this.isDate = isDate;
nextWindow = null;
watermark = Long.MIN_VALUE;
// index to define
triggerWindowStartIndex = 0;
emptyWindowTriggered = true;
createBuffer();
}
public void reset() {
nextWindow = null;
watermark = Long.MIN_VALUE;
triggerWindowStartIndex = 0;
emptyWindowTriggered = true;
resetBuffer();
}
public void close() {
}
public void addInputToBuffer(BinaryRow input) throws IOException {
if (!input.isNullAt(timeIndex)) {
addIntoBuffer(input.copy());
advanceWatermark(getTimeValue(input));
}
}
/**
* Advance the watermark to trigger all the possible windows.
* It is designed to be idempotent.
*/
public void advanceWatermarkToTriggerAllWindows() {
skipEmptyWindow();
advanceWatermark(watermark + windowSize);
}
/**
* Check if there are windows could be triggered according to the current watermark.
*
* @return true when there are windows to be triggered.
* It is designed to be idempotent.
*/
public boolean hasTriggerWindow() {
skipEmptyWindow();
Preconditions.checkState(watermark == Long.MIN_VALUE || nextWindow != null,
"next trigger window cannot be null.");
return nextWindow != null && nextWindow.getEnd() <= watermark;
}
/**
* @return the iterator of the next triggerable window's elements.
*/
public RowIterator<BinaryRow> buildTriggerWindowElementsIterator() {
currentWindow = nextWindow;
// It is illegal to call this method after [[hasTriggerWindow()]] has returned `false`.
Preconditions.checkState(watermark == Long.MIN_VALUE || nextWindow != null,
"next trigger window cannot be null.");
if (nextWindow.getEnd() > watermark) {
throw new IllegalStateException("invalid window triggered " + currentWindow);
}
// advance in the stride of slideSize for hasTriggerWindow
nextWindow = TimeWindow.of(currentWindow.getStart() + slideSize,
currentWindow.getStart() + slideSize + windowSize);
// build trigger window elements' iterator
emptyWindowTriggered = true;
onBufferEvict(triggerWindowStartIndex);
return new WindowsElementsIterator(newBufferIterator(triggerWindowStartIndex));
}
/**
* @return the last triggered window.
*/
public TimeWindow getTriggerWindow() {
return currentWindow;
}
private boolean belongsToCurrentWindow(BinaryRow element) {
long currentTimestamp = getTimeValue(element);
if (currentTimestamp >= currentWindow.getStart() &&
currentTimestamp < currentWindow.getEnd()) {
// evict elements for next window
evictForWindow(element, nextWindow);
return true;
}
return false;
}
private boolean evictForWindow(BinaryRow element, TimeWindow window) {
if (getTimeValue(element) < window.getStart()) {
triggerWindowStartIndex++;
return true;
} else {
return false;
}
}
private void advanceWatermark(long timestamp) {
watermark = timestamp;
}
private void skipEmptyWindow() {
if (emptyWindowTriggered && watermark != Long.MIN_VALUE) {
nextWindow = advanceNextWindowByWatermark(watermark);
emptyWindowTriggered = false;
}
}
private TimeWindow advanceNextWindowByWatermark(long watermark) {
int maxOverlapping = (int) Math.ceil(windowSize * 1.0 / slideSize);
long start = getWindowStartWithOffset(watermark, windowStartOffset, slideSize);
for (int i = 1; i < maxOverlapping; i++) {
long nextStart = start - slideSize;
if (nextStart + windowSize > watermark) {
start = nextStart;
} else {
break;
}
}
if (nextWindow == null || start > nextWindow.getStart()) {
// This check is used in the case of jumping window.
return TimeWindow.of(start, start + windowSize);
} else {
return nextWindow;
}
}
private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
long remainder = (timestamp - offset) % windowSize;
// handle both positive and negative cases
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}
private long getTimeValue(BaseRow row) {
if (isDate) {
return row.getInt(timeIndex) * DateTimeFunctions.MILLIS_PER_DAY();
} else {
return row.getLong(timeIndex);
}
}
/**
*
*/
class WindowsElementsIterator implements RowIterator<BinaryRow> {
private final RowIterator<BinaryRow> bufferIterator;
private BinaryRow next;
WindowsElementsIterator(RowIterator<BinaryRow> iterator) {
this.bufferIterator = iterator;
}
@Override
public boolean advanceNext() {
// find the first element belongs to the current window,
// evict elements from current window, used in the case of jumping window cases.
do {
if (bufferIterator.advanceNext()) {
next = bufferIterator.getRow();
} else {
next = null;
return false;
}
} while (evictForWindow(next, currentWindow));
// Find the last element belongs to the current window.
if (belongsToCurrentWindow(next)) {
emptyWindowTriggered = false;
return true;
} else {
next = null;
return false;
}
}
@Override
public BinaryRow getRow() {
return next;
}
}
protected abstract void createBuffer();
protected abstract void resetBuffer();
protected abstract void addIntoBuffer(BinaryRow input) throws IOException;
protected abstract void onBufferEvict(int limitIndex);
protected abstract RowIterator<BinaryRow> newBufferIterator(int startIndex);
}