blob: ab6b4278d322307f619024a2f1a30d2e969bed06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.values;
import java.io.Serializable;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.joda.time.Duration;
/**
* A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values.
* It has both a {@link WindowFn} describing how elements are assigned to windows and a {@link
* Trigger} that controls when output is produced for each window.
*
* @param <T> type of elements being windowed
* @param <W> {@link BoundedWindow} subclass used to represent the windows used by this {@code
* WindowingStrategy}
*/
public class WindowingStrategy<T, W extends BoundedWindow> implements Serializable {
/** The accumulation modes that can be used with windowing. */
public enum AccumulationMode {
DISCARDING_FIRED_PANES,
ACCUMULATING_FIRED_PANES
}
private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO;
private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows());
private final WindowFn<T, W> windowFn;
private final Trigger trigger;
private final AccumulationMode mode;
private final Duration allowedLateness;
private final ClosingBehavior closingBehavior;
private final OnTimeBehavior onTimeBehavior;
private final TimestampCombiner timestampCombiner;
private final boolean triggerSpecified;
private final boolean modeSpecified;
private final boolean allowedLatenessSpecified;
private final boolean timestampCombinerSpecified;
private WindowingStrategy(
WindowFn<T, W> windowFn,
Trigger trigger,
boolean triggerSpecified,
AccumulationMode mode,
boolean modeSpecified,
Duration allowedLateness,
boolean allowedLatenessSpecified,
TimestampCombiner timestampCombiner,
boolean timestampCombinerSpecified,
ClosingBehavior closingBehavior,
OnTimeBehavior onTimeBehavior) {
this.windowFn = windowFn;
this.trigger = trigger;
this.triggerSpecified = triggerSpecified;
this.mode = mode;
this.modeSpecified = modeSpecified;
this.allowedLateness = allowedLateness;
this.allowedLatenessSpecified = allowedLatenessSpecified;
this.closingBehavior = closingBehavior;
this.onTimeBehavior = onTimeBehavior;
this.timestampCombiner = timestampCombiner;
this.timestampCombinerSpecified = timestampCombinerSpecified;
}
/** Return a fully specified, default windowing strategy. */
public static WindowingStrategy<Object, GlobalWindow> globalDefault() {
return DEFAULT;
}
public static <T, W extends BoundedWindow> WindowingStrategy<T, W> of(WindowFn<T, W> windowFn) {
return new WindowingStrategy<>(
windowFn,
DefaultTrigger.of(),
false,
AccumulationMode.DISCARDING_FIRED_PANES,
false,
DEFAULT_ALLOWED_LATENESS,
false,
TimestampCombiner.END_OF_WINDOW,
false,
ClosingBehavior.FIRE_IF_NON_EMPTY,
OnTimeBehavior.FIRE_ALWAYS);
}
public WindowFn<T, W> getWindowFn() {
return windowFn;
}
public Trigger getTrigger() {
return trigger;
}
public boolean isTriggerSpecified() {
return triggerSpecified;
}
public Duration getAllowedLateness() {
return allowedLateness;
}
public boolean isAllowedLatenessSpecified() {
return allowedLatenessSpecified;
}
public AccumulationMode getMode() {
return mode;
}
public boolean isModeSpecified() {
return modeSpecified;
}
public ClosingBehavior getClosingBehavior() {
return closingBehavior;
}
public OnTimeBehavior getOnTimeBehavior() {
return onTimeBehavior;
}
public TimestampCombiner getTimestampCombiner() {
return timestampCombiner;
}
public boolean isTimestampCombinerSpecified() {
return timestampCombinerSpecified;
}
/**
* Returns a {@link WindowingStrategy} identical to {@code this} but with the trigger set to
* {@code wildcardTrigger}.
*/
public WindowingStrategy<T, W> withTrigger(Trigger trigger) {
return new WindowingStrategy<>(
windowFn,
trigger,
true,
mode,
modeSpecified,
allowedLateness,
allowedLatenessSpecified,
timestampCombiner,
timestampCombinerSpecified,
closingBehavior,
onTimeBehavior);
}
/**
* Returns a {@link WindowingStrategy} identical to {@code this} but with the accumulation mode
* set to {@code mode}.
*/
public WindowingStrategy<T, W> withMode(AccumulationMode mode) {
return new WindowingStrategy<>(
windowFn,
trigger,
triggerSpecified,
mode,
true,
allowedLateness,
allowedLatenessSpecified,
timestampCombiner,
timestampCombinerSpecified,
closingBehavior,
onTimeBehavior);
}
/**
* Returns a {@link WindowingStrategy} identical to {@code this} but with the window function set
* to {@code wildcardWindowFn}.
*/
public WindowingStrategy<T, W> withWindowFn(WindowFn<?, ?> wildcardWindowFn) {
@SuppressWarnings("unchecked")
WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn;
return new WindowingStrategy<>(
typedWindowFn,
trigger,
triggerSpecified,
mode,
modeSpecified,
allowedLateness,
allowedLatenessSpecified,
timestampCombiner,
timestampCombinerSpecified,
closingBehavior,
onTimeBehavior);
}
/**
* Returns a {@link WindowingStrategy} identical to {@code this} but with the allowed lateness set
* to {@code allowedLateness}.
*/
public WindowingStrategy<T, W> withAllowedLateness(Duration allowedLateness) {
return new WindowingStrategy<>(
windowFn,
trigger,
triggerSpecified,
mode,
modeSpecified,
allowedLateness,
true,
timestampCombiner,
timestampCombinerSpecified,
closingBehavior,
onTimeBehavior);
}
public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) {
return new WindowingStrategy<>(
windowFn,
trigger,
triggerSpecified,
mode,
modeSpecified,
allowedLateness,
allowedLatenessSpecified,
timestampCombiner,
timestampCombinerSpecified,
closingBehavior,
onTimeBehavior);
}
public WindowingStrategy<T, W> withOnTimeBehavior(OnTimeBehavior onTimeBehavior) {
return new WindowingStrategy<>(
windowFn,
trigger,
triggerSpecified,
mode,
modeSpecified,
allowedLateness,
allowedLatenessSpecified,
timestampCombiner,
timestampCombinerSpecified,
closingBehavior,
onTimeBehavior);
}
@Experimental(Experimental.Kind.OUTPUT_TIME)
public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) {
return new WindowingStrategy<>(
windowFn,
trigger,
triggerSpecified,
mode,
modeSpecified,
allowedLateness,
allowedLatenessSpecified,
timestampCombiner,
true,
closingBehavior,
onTimeBehavior);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("windowFn", windowFn)
.add("allowedLateness", allowedLateness)
.add("trigger", trigger)
.add("accumulationMode", mode)
.add("timestampCombiner", timestampCombiner)
.toString();
}
@Override
public boolean equals(Object object) {
if (!(object instanceof WindowingStrategy)) {
return false;
}
WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object;
return isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified()
&& isModeSpecified() == other.isModeSpecified()
&& isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified()
&& getMode().equals(other.getMode())
&& getAllowedLateness().equals(other.getAllowedLateness())
&& getClosingBehavior().equals(other.getClosingBehavior())
&& getOnTimeBehavior().equals(other.getOnTimeBehavior())
&& getTrigger().equals(other.getTrigger())
&& getTimestampCombiner().equals(other.getTimestampCombiner())
&& getWindowFn().equals(other.getWindowFn());
}
@Override
public int hashCode() {
return Objects.hash(
allowedLatenessSpecified,
modeSpecified,
timestampCombinerSpecified,
mode,
allowedLateness,
closingBehavior,
trigger,
timestampCombiner,
windowFn);
}
/**
* Fixes all the defaults so that equals can be used to check that two strategies are the same,
* regardless of the state of "defaulted-ness".
*/
@VisibleForTesting
public WindowingStrategy<T, W> fixDefaults() {
return new WindowingStrategy<>(
windowFn,
trigger,
true,
mode,
true,
allowedLateness,
true,
timestampCombiner,
true,
closingBehavior,
onTimeBehavior);
}
}