blob: 5085eb9f7964d164bcf63b7818ef15ffd1f9cab9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
import java.util.Collection;
import org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.api.java.function.MapFunction;
import org.joda.time.Instant;
/** Helper functions for working with windows. */
public final class WindowingHelpers {
/**
* Checks if the window transformation should be applied or skipped.
*
* <p>Avoid running assign windows if both source and destination are global window or if the user
* has not specified the WindowFn (meaning they are just messing with triggering or allowed
* lateness).
*/
@SuppressWarnings("unchecked")
public static <T, W extends BoundedWindow> boolean skipAssignWindows(
Window.Assign<T> transform, AbstractTranslationContext context) {
WindowFn<? super T, W> windowFnToApply = (WindowFn<? super T, W>) transform.getWindowFn();
PCollection<T> input = (PCollection<T>) context.getInput();
WindowFn<?, ?> windowFnOfInput = input.getWindowingStrategy().getWindowFn();
return windowFnToApply == null
|| (windowFnOfInput instanceof GlobalWindows && windowFnToApply instanceof GlobalWindows);
}
public static <T, W extends BoundedWindow>
MapFunction<WindowedValue<T>, WindowedValue<T>> assignWindowsMapFunction(
WindowFn<T, W> windowFn) {
return (MapFunction<WindowedValue<T>, WindowedValue<T>>)
windowedValue -> {
final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
final T element = windowedValue.getValue();
final Instant timestamp = windowedValue.getTimestamp();
Collection<W> windows =
windowFn.assignWindows(
windowFn.new AssignContext() {
@Override
public T element() {
return element;
}
@Override
public Instant timestamp() {
return timestamp;
}
@Override
public BoundedWindow window() {
return boundedWindow;
}
});
return WindowedValue.of(element, timestamp, windows, windowedValue.getPane());
};
}
}