blob: dbd366e8b66bb4e717bfe9bf6cb2c99fff3bd9d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.window.triggers;
import org.apache.flink.api.common.functions.Merger;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.window.Window;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link Trigger} that reacts to processing-time timers.
* The behavior can be one of the following:
* <p><ul>
* <li> fire when the processing time passes the end of the window ({@link ProcessingTime#afterEndOfWindow()}),
* <li> fire when the processing time advances by a certain interval
* after reception of the first element after the last firing for
* a given window ({@link ProcessingTime#every(Duration)}).
* </ul></p>
* In the first case, the trigger can also specify an <tt>early</tt> trigger.
* The <tt>early trigger</tt> will be responsible for specifying when the trigger should fire in the period
* between the beginning of the window and the time when the processing time passes the end of the window.
*/
public class ProcessingTime {
private static final Merger<Long> MERGER = new MinAmongSet();
private static final String TO_STRING = "ProcessingTime.afterEndOfWindow()";
/**
* Creates a trigger that fires when the processing time passes the end of the window.
*/
public static <W extends Window> AfterEndOfWindow<W> afterEndOfWindow() {
return new AfterEndOfWindow<>();
}
/**
* Creates a trigger that fires by a certain interval after reception of the first element.
* @param time the certain interval
*/
public static <W extends Window> AfterFirstElementPeriodic<W> every(Duration time) {
return new AfterFirstElementPeriodic<>(time.toMillis());
}
/**
* Trigger every a given interval, the first trigger time is interval
* after the first element in the pane.
*
* @param <W> type of window
*/
public static final class AfterFirstElementPeriodic<W extends Window> extends Trigger<W> {
private static final long serialVersionUID = -4710472821577125673L;
private final long interval;
private transient TriggerContext ctx;
private transient ValueState<Long> nextFiring;
AfterFirstElementPeriodic(long interval) {
checkArgument(interval > 0);
this.interval = interval;
}
@Override
public void open(TriggerContext ctx) throws Exception {
this.ctx = ctx;
String descriptorName = "processingTime-every-" + interval;
ValueStateDescriptor<Long> nextFiringDescriptor = new ValueStateDescriptor<>(
descriptorName,
Types.LONG);
this.nextFiring = ctx.getValueState(nextFiringDescriptor);
}
@Override
public boolean onElement(Object element, long timestamp, W window) throws Exception {
Long timer = nextFiring.value();
if (timer == null) {
long nextTimer = ctx.getCurrentProcessingTime() + interval;
ctx.registerProcessingTimeTimer(nextTimer);
nextFiring.update(nextTimer);
}
return false;
}
@Override
public boolean onProcessingTime(long time, W window) throws Exception {
Long timer = nextFiring.value();
if (timer != null && timer == time) {
long newTimer = time + interval;
ctx.registerProcessingTimeTimer(newTimer);
nextFiring.update(newTimer);
return true;
} else {
return false;
}
}
@Override
public boolean onEventTime(long time, W window) throws Exception {
return false;
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext mergeContext) throws Exception {
Long nextTimer = mergeContext.mergeValueState(nextFiring, MERGER);
if (nextTimer != null) {
ctx.registerProcessingTimeTimer(nextTimer);
}
}
@Override
public void clear(W window) throws Exception {
Long timer = nextFiring.value();
if (timer != null) {
ctx.deleteProcessingTimeTimer(timer);
nextFiring.clear();
}
}
@Override
public String toString() {
return "ProcessingTime.every(" + interval + ")";
}
}
/**
* A {@link Trigger} that fires once the current system time passes the end of the window
* to which a pane belongs.
*/
public static final class AfterEndOfWindow<W extends Window> extends Trigger<W> {
private static final long serialVersionUID = 2369815941792574642L;
/**
* Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
* the given {@code Trigger} fires before the processing time has passed the end of the window.
*/
public AfterEndOfWindowNoLate<W> withEarlyFirings(Trigger<W> earlyFirings) {
checkNotNull(earlyFirings);
return new AfterEndOfWindowNoLate<>(earlyFirings);
}
private TriggerContext ctx;
@Override
public void open(TriggerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public boolean onElement(Object element, long timestamp, W window) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return false;
}
@Override
public boolean onProcessingTime(long time, W window) throws Exception {
return time == window.maxTimestamp();
}
@Override
public boolean onEventTime(long time, W window) throws Exception {
return false;
}
@Override
public void clear(W window) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext mergeContext) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
}
@Override
public String toString() {
return TO_STRING;
}
}
/**
* A composite {@link Trigger} that consist of AfterEndOfWindow and a early trigger.
*/
public static final class AfterEndOfWindowNoLate<W extends Window> extends Trigger<W> {
private static final long serialVersionUID = 2310050856564792734L;
// early trigger is always not null
private final Trigger<W> earlyTrigger;
private TriggerContext ctx;
AfterEndOfWindowNoLate(Trigger<W> earlyTrigger) {
checkNotNull(earlyTrigger);
this.earlyTrigger = earlyTrigger;
}
@Override
public void open(TriggerContext ctx) throws Exception {
this.ctx = ctx;
earlyTrigger.open(ctx);
}
@Override
public boolean onElement(Object element, long timestamp, W window) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return earlyTrigger.onElement(element, timestamp, window);
}
@Override
public boolean onProcessingTime(long time, W window) throws Exception {
return time == window.maxTimestamp() || earlyTrigger.onProcessingTime(time, window);
}
@Override
public boolean onEventTime(long time, W window) throws Exception {
return earlyTrigger.onEventTime(time, window);
}
@Override
public boolean canMerge() {
return earlyTrigger.canMerge();
}
@Override
public void onMerge(W window, OnMergeContext mergeContext) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
earlyTrigger.onMerge(window, mergeContext);
}
@Override
public void clear(W window) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
earlyTrigger.clear(window);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(TO_STRING);
if (earlyTrigger != null) {
builder
.append(".withEarlyFirings(")
.append(earlyTrigger)
.append(")");
}
return builder.toString();
}
}
// ===================================================================================
// Utils
// ===================================================================================
/**
* Upon merging it returns the minimum among the values.
*/
private static final class MinAmongSet implements Merger<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long merge(Long value1, Long value2) {
if (value1 == null || value2 == null) {
return value1 == null ? value2 : value1;
} else {
return Math.min(value1, value2);
}
}
}
}