blob: 7c1063988996ceac6292168e516be350406d932a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.window.assigners;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.dataformat.BaseRow;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.NavigableSet;
/**
* A {@link WindowAssigner} that windows elements into sessions based on the timestamp.
* Windows cannot overlap.
*/
public class SessionWindowAssigner extends MergingWindowAssigner<TimeWindow> implements InternalTimeWindowAssigner {
private static final long serialVersionUID = -2595385378968688228L;
private final long sessionGap;
private final boolean isEventTime;
protected SessionWindowAssigner(long sessionGap, boolean isEventTime) {
if (sessionGap <= 0) {
throw new IllegalArgumentException("SessionWindowAssigner parameters must satisfy 0 < size");
}
this.sessionGap = sessionGap;
this.isEventTime = isEventTime;
}
@Override
public Collection<TimeWindow> assignWindows(BaseRow element, long timestamp) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionGap));
}
@Override
public void mergeWindows(TimeWindow newWindow, NavigableSet<TimeWindow> sortedWindows, MergeCallback<TimeWindow> callback) {
TimeWindow ceiling = sortedWindows.ceiling(newWindow);
TimeWindow floor = sortedWindows.floor(newWindow);
Collection<TimeWindow> mergedWindows = new HashSet<>();
TimeWindow mergeResult = newWindow;
if (ceiling != null) {
mergeResult = mergeWindow(mergeResult, ceiling, mergedWindows);
}
if (floor != null) {
mergeResult = mergeWindow(mergeResult, floor, mergedWindows);
}
if (!mergedWindows.isEmpty()) {
// merge happens, add newWindow into the collection as well.
mergedWindows.add(newWindow);
callback.merge(mergeResult, mergedWindows);
}
}
/**
* Merge curWindow and other, return a new window which covers curWindow and other
* if they are overlapped. Otherwise, returns the curWindow itself. */
private TimeWindow mergeWindow(TimeWindow curWindow, TimeWindow other, Collection<TimeWindow> mergedWindow) {
if (curWindow.intersects(other)) {
mergedWindow.add(other);
return curWindow.cover(other);
} else {
return curWindow;
}
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return isEventTime;
}
@Override
public String toString() {
return "SessionWindow(" + sessionGap + ")";
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/**
* Creates a new {@code SessionWindowAssigner} {@link WindowAssigner} that assigns
* elements to sessions based on the timestamp.
*
* @param size The session timeout, i.e. the time gap between sessions
* @return The policy.
*/
public static SessionWindowAssigner withGap(Duration size) {
return new SessionWindowAssigner(size.toMillis(), true);
}
public SessionWindowAssigner withEventTime() {
return new SessionWindowAssigner(sessionGap, true);
}
public SessionWindowAssigner withProcessingTime() {
return new SessionWindowAssigner(sessionGap, false);
}
}