blob: f8f4bb19fe40b2c770d5e469e41ec5752988cac3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.window;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.codegen.GeneratedRecordEqualiser;
import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.sort.RecordEqualiser;
import org.apache.flink.table.runtime.window.assigners.CountSlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.CountTumblingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.InternalTimeWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.window.triggers.Element;
import org.apache.flink.table.runtime.window.triggers.EventTime;
import org.apache.flink.table.runtime.window.triggers.ProcessingTime;
import org.apache.flink.table.runtime.window.triggers.Trigger;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The {@link WindowOperatorBuilder} is used to build {@link WindowOperator} fluently.
*
* <pre>
* WindowOperatorBuilder
* .builder(KeyedStream)
* .tumble(Duration.ofMinutes(1)) // sliding(...), session(...)
* .withEventTime() // withProcessingTime()
* .aggregate(AggregationsFunction, accTypes, windowTypes)
* .withAllowedLateness(Duration.ZERO)
* .withSendRetraction()
* .build();
* </pre>
*/
public class WindowOperatorBuilder {
private InternalType[] inputFieldTypes;
private WindowAssigner<?> windowAssigner;
private Trigger<?> trigger;
private SubKeyedAggsHandleFunction<?> aggregateFunction;
private GeneratedSubKeyedAggsHandleFunction<?> generatedAggregateFunction;
private RecordEqualiser equaliser;
private GeneratedRecordEqualiser generatedEqualiser;
private InternalType[] accumulatorTypes;
private InternalType[] aggResultTypes;
private InternalType[] windowPropertyTypes;
private long allowedLateness = 0L;
private boolean sendRetraction = false;
private int rowtimeIndex = -1;
public static WindowOperatorBuilder builder() {
return new WindowOperatorBuilder();
}
public WindowOperatorBuilder withInputFields(InternalType[] inputFieldTypes) {
this.inputFieldTypes = inputFieldTypes;
return this;
}
public WindowOperatorBuilder tumble(Duration size) {
checkArgument(windowAssigner == null);
this.windowAssigner = TumblingWindowAssigner.of(size);
return this;
}
public WindowOperatorBuilder sliding(Duration size, Duration slide) {
checkArgument(windowAssigner == null);
this.windowAssigner = SlidingWindowAssigner.of(size, slide);
return this;
}
public WindowOperatorBuilder session(Duration sessionGap) {
checkArgument(windowAssigner == null);
this.windowAssigner = SessionWindowAssigner.withGap(sessionGap);
return this;
}
public WindowOperatorBuilder countWindow(long size) {
checkArgument(windowAssigner == null);
checkArgument(trigger == null);
this.windowAssigner = CountTumblingWindowAssigner.of(size);
this.trigger = Element.count(size);
return this;
}
public WindowOperatorBuilder countWindow(long size, long slide) {
checkArgument(windowAssigner == null);
checkArgument(trigger == null);
this.windowAssigner = CountSlidingWindowAssigner.of(size, slide);
this.trigger = Element.count(size);
return this;
}
public WindowOperatorBuilder assigner(WindowAssigner<?> windowAssigner) {
checkArgument(this.windowAssigner == null);
checkNotNull(windowAssigner);
this.windowAssigner = windowAssigner;
return this;
}
public WindowOperatorBuilder triggering(Trigger<?> trigger) {
checkNotNull(trigger);
this.trigger = trigger;
return this;
}
public WindowOperatorBuilder withEventTime(int rowtimeIndex) {
checkNotNull(windowAssigner);
checkArgument(windowAssigner instanceof InternalTimeWindowAssigner);
InternalTimeWindowAssigner timeWindowAssigner = (InternalTimeWindowAssigner) windowAssigner;
this.windowAssigner = (WindowAssigner<?>) timeWindowAssigner.withEventTime();
this.rowtimeIndex = rowtimeIndex;
if (trigger == null) {
this.trigger = EventTime.afterEndOfWindow();
}
return this;
}
public WindowOperatorBuilder withProcessingTime() {
checkNotNull(windowAssigner);
checkArgument(windowAssigner instanceof InternalTimeWindowAssigner);
InternalTimeWindowAssigner timeWindowAssigner = (InternalTimeWindowAssigner) windowAssigner;
this.windowAssigner = (WindowAssigner<?>) timeWindowAssigner.withProcessingTime();
if (trigger == null) {
this.trigger = ProcessingTime.afterEndOfWindow();
}
return this;
}
public WindowOperatorBuilder withAllowedLateness(Duration allowedLateness) {
checkArgument(!allowedLateness.isNegative());
if (allowedLateness.toMillis() > 0) {
this.allowedLateness = allowedLateness.toMillis();
// allow late element, which means this window will send retractions
this.sendRetraction = true;
}
return this;
}
public WindowOperatorBuilder aggregate(
SubKeyedAggsHandleFunction<?> aggregateFunction,
RecordEqualiser equaliser,
InternalType[] accumulatorTypes,
InternalType[] aggResultTypes,
InternalType[] windowPropertyTypes) {
ClosureCleaner.clean(aggregateFunction, true);
this.accumulatorTypes = accumulatorTypes;
this.aggResultTypes = aggResultTypes;
this.windowPropertyTypes = windowPropertyTypes;
this.aggregateFunction = checkNotNull(aggregateFunction);
this.equaliser = checkNotNull(equaliser);
return this;
}
public WindowOperatorBuilder aggregate(
GeneratedSubKeyedAggsHandleFunction<?> generatedAggregateFunction,
GeneratedRecordEqualiser generatedEqualiser,
InternalType[] accumulatorTypes,
InternalType[] aggResultTypes,
InternalType[] windowPropertyTypes) {
this.accumulatorTypes = accumulatorTypes;
this.aggResultTypes = aggResultTypes;
this.windowPropertyTypes = windowPropertyTypes;
this.generatedAggregateFunction = checkNotNull(generatedAggregateFunction);
this.generatedEqualiser = checkNotNull(generatedEqualiser);
return this;
}
public WindowOperatorBuilder withSendRetraction() {
this.sendRetraction = true;
return this;
}
public WindowOperator build() {
checkNotNull(trigger, "trigger is not set");
if (generatedAggregateFunction != null && generatedEqualiser != null) {
//noinspection unchecked
return new WindowOperator(
generatedAggregateFunction,
generatedEqualiser,
windowAssigner,
trigger,
windowAssigner.getWindowSerializer(new ExecutionConfig()),
inputFieldTypes,
accumulatorTypes,
aggResultTypes,
windowPropertyTypes,
rowtimeIndex,
sendRetraction,
allowedLateness);
} else {
//noinspection unchecked
return new WindowOperator(
aggregateFunction,
equaliser,
windowAssigner,
trigger,
windowAssigner.getWindowSerializer(new ExecutionConfig()),
inputFieldTypes,
accumulatorTypes,
aggResultTypes,
windowPropertyTypes,
rowtimeIndex,
sendRetraction,
allowedLateness);
}
}
}