| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.runtime.window; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.java.ClosureCleaner; |
| import org.apache.flink.table.api.types.InternalType; |
| import org.apache.flink.table.codegen.GeneratedRecordEqualiser; |
| import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction; |
| import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction; |
| import org.apache.flink.table.runtime.sort.RecordEqualiser; |
| import org.apache.flink.table.runtime.window.assigners.CountSlidingWindowAssigner; |
| import org.apache.flink.table.runtime.window.assigners.CountTumblingWindowAssigner; |
| import org.apache.flink.table.runtime.window.assigners.InternalTimeWindowAssigner; |
| import org.apache.flink.table.runtime.window.assigners.SessionWindowAssigner; |
| import org.apache.flink.table.runtime.window.assigners.SlidingWindowAssigner; |
| import org.apache.flink.table.runtime.window.assigners.TumblingWindowAssigner; |
| import org.apache.flink.table.runtime.window.assigners.WindowAssigner; |
| import org.apache.flink.table.runtime.window.triggers.Element; |
| import org.apache.flink.table.runtime.window.triggers.EventTime; |
| import org.apache.flink.table.runtime.window.triggers.ProcessingTime; |
| import org.apache.flink.table.runtime.window.triggers.Trigger; |
| |
| import java.time.Duration; |
| |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * The {@link WindowOperatorBuilder} is used to build {@link WindowOperator} fluently. |
| * |
| * <pre> |
| * WindowOperatorBuilder |
| * .builder(KeyedStream) |
| * .tumble(Duration.ofMinutes(1)) // sliding(...), session(...) |
| * .withEventTime() // withProcessingTime() |
| * .aggregate(AggregationsFunction, accTypes, windowTypes) |
| * .withAllowedLateness(Duration.ZERO) |
| * .withSendRetraction() |
| * .build(); |
| * </pre> |
| */ |
| public class WindowOperatorBuilder { |
| private InternalType[] inputFieldTypes; |
| private WindowAssigner<?> windowAssigner; |
| private Trigger<?> trigger; |
| private SubKeyedAggsHandleFunction<?> aggregateFunction; |
| private GeneratedSubKeyedAggsHandleFunction<?> generatedAggregateFunction; |
| private RecordEqualiser equaliser; |
| private GeneratedRecordEqualiser generatedEqualiser; |
| private InternalType[] accumulatorTypes; |
| private InternalType[] aggResultTypes; |
| private InternalType[] windowPropertyTypes; |
| private long allowedLateness = 0L; |
| private boolean sendRetraction = false; |
| private int rowtimeIndex = -1; |
| |
| public static WindowOperatorBuilder builder() { |
| return new WindowOperatorBuilder(); |
| } |
| |
| public WindowOperatorBuilder withInputFields(InternalType[] inputFieldTypes) { |
| this.inputFieldTypes = inputFieldTypes; |
| return this; |
| } |
| |
| public WindowOperatorBuilder tumble(Duration size) { |
| checkArgument(windowAssigner == null); |
| this.windowAssigner = TumblingWindowAssigner.of(size); |
| return this; |
| } |
| |
| public WindowOperatorBuilder sliding(Duration size, Duration slide) { |
| checkArgument(windowAssigner == null); |
| this.windowAssigner = SlidingWindowAssigner.of(size, slide); |
| return this; |
| } |
| |
| public WindowOperatorBuilder session(Duration sessionGap) { |
| checkArgument(windowAssigner == null); |
| this.windowAssigner = SessionWindowAssigner.withGap(sessionGap); |
| return this; |
| } |
| |
| public WindowOperatorBuilder countWindow(long size) { |
| checkArgument(windowAssigner == null); |
| checkArgument(trigger == null); |
| this.windowAssigner = CountTumblingWindowAssigner.of(size); |
| this.trigger = Element.count(size); |
| return this; |
| } |
| |
| public WindowOperatorBuilder countWindow(long size, long slide) { |
| checkArgument(windowAssigner == null); |
| checkArgument(trigger == null); |
| this.windowAssigner = CountSlidingWindowAssigner.of(size, slide); |
| this.trigger = Element.count(size); |
| return this; |
| } |
| |
| public WindowOperatorBuilder assigner(WindowAssigner<?> windowAssigner) { |
| checkArgument(this.windowAssigner == null); |
| checkNotNull(windowAssigner); |
| this.windowAssigner = windowAssigner; |
| return this; |
| } |
| |
| public WindowOperatorBuilder triggering(Trigger<?> trigger) { |
| checkNotNull(trigger); |
| this.trigger = trigger; |
| return this; |
| } |
| |
| public WindowOperatorBuilder withEventTime(int rowtimeIndex) { |
| checkNotNull(windowAssigner); |
| checkArgument(windowAssigner instanceof InternalTimeWindowAssigner); |
| InternalTimeWindowAssigner timeWindowAssigner = (InternalTimeWindowAssigner) windowAssigner; |
| this.windowAssigner = (WindowAssigner<?>) timeWindowAssigner.withEventTime(); |
| this.rowtimeIndex = rowtimeIndex; |
| if (trigger == null) { |
| this.trigger = EventTime.afterEndOfWindow(); |
| } |
| return this; |
| } |
| |
| public WindowOperatorBuilder withProcessingTime() { |
| checkNotNull(windowAssigner); |
| checkArgument(windowAssigner instanceof InternalTimeWindowAssigner); |
| InternalTimeWindowAssigner timeWindowAssigner = (InternalTimeWindowAssigner) windowAssigner; |
| this.windowAssigner = (WindowAssigner<?>) timeWindowAssigner.withProcessingTime(); |
| if (trigger == null) { |
| this.trigger = ProcessingTime.afterEndOfWindow(); |
| } |
| return this; |
| } |
| |
| public WindowOperatorBuilder withAllowedLateness(Duration allowedLateness) { |
| checkArgument(!allowedLateness.isNegative()); |
| if (allowedLateness.toMillis() > 0) { |
| this.allowedLateness = allowedLateness.toMillis(); |
| // allow late element, which means this window will send retractions |
| this.sendRetraction = true; |
| } |
| return this; |
| } |
| |
| public WindowOperatorBuilder aggregate( |
| SubKeyedAggsHandleFunction<?> aggregateFunction, |
| RecordEqualiser equaliser, |
| InternalType[] accumulatorTypes, |
| InternalType[] aggResultTypes, |
| InternalType[] windowPropertyTypes) { |
| |
| ClosureCleaner.clean(aggregateFunction, true); |
| this.accumulatorTypes = accumulatorTypes; |
| this.aggResultTypes = aggResultTypes; |
| this.windowPropertyTypes = windowPropertyTypes; |
| this.aggregateFunction = checkNotNull(aggregateFunction); |
| this.equaliser = checkNotNull(equaliser); |
| return this; |
| } |
| |
| public WindowOperatorBuilder aggregate( |
| GeneratedSubKeyedAggsHandleFunction<?> generatedAggregateFunction, |
| GeneratedRecordEqualiser generatedEqualiser, |
| InternalType[] accumulatorTypes, |
| InternalType[] aggResultTypes, |
| InternalType[] windowPropertyTypes) { |
| |
| this.accumulatorTypes = accumulatorTypes; |
| this.aggResultTypes = aggResultTypes; |
| this.windowPropertyTypes = windowPropertyTypes; |
| this.generatedAggregateFunction = checkNotNull(generatedAggregateFunction); |
| this.generatedEqualiser = checkNotNull(generatedEqualiser); |
| return this; |
| } |
| |
| public WindowOperatorBuilder withSendRetraction() { |
| this.sendRetraction = true; |
| return this; |
| } |
| |
| public WindowOperator build() { |
| checkNotNull(trigger, "trigger is not set"); |
| if (generatedAggregateFunction != null && generatedEqualiser != null) { |
| //noinspection unchecked |
| return new WindowOperator( |
| generatedAggregateFunction, |
| generatedEqualiser, |
| windowAssigner, |
| trigger, |
| windowAssigner.getWindowSerializer(new ExecutionConfig()), |
| inputFieldTypes, |
| accumulatorTypes, |
| aggResultTypes, |
| windowPropertyTypes, |
| rowtimeIndex, |
| sendRetraction, |
| allowedLateness); |
| } else { |
| //noinspection unchecked |
| return new WindowOperator( |
| aggregateFunction, |
| equaliser, |
| windowAssigner, |
| trigger, |
| windowAssigner.getWindowSerializer(new ExecutionConfig()), |
| inputFieldTypes, |
| accumulatorTypes, |
| aggResultTypes, |
| windowPropertyTypes, |
| rowtimeIndex, |
| sendRetraction, |
| allowedLateness); |
| } |
| } |
| } |