blob: 84b1253e26f9a25b66b71c842933c92a6a3d06ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
/**
* An immutable triple of value, timestamp, and windows.
*
* @param <T> the type of the value
*/
public abstract class WindowedValue<T> {
/** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */
public static <T> WindowedValue<T> of(
T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null");
checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none");
if (windows.size() == 1) {
return of(value, timestamp, windows.iterator().next(), pane);
} else {
return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
}
}
/** @deprecated for use only in compatibility with old broken code */
@Deprecated
static <T> WindowedValue<T> createWithoutValidation(
T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
if (windows.size() == 1) {
return of(value, timestamp, windows.iterator().next(), pane);
} else {
return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
}
}
/** Returns a {@code WindowedValue} with the given value, timestamp, and window. */
public static <T> WindowedValue<T> of(
T value, Instant timestamp, BoundedWindow window, PaneInfo pane) {
checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null");
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
return valueInGlobalWindow(value, pane);
} else if (isGlobal) {
return new TimestampedValueInGlobalWindow<>(value, timestamp, pane);
} else {
return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane);
}
}
/**
* Returns a {@code WindowedValue} with the given value in the {@link GlobalWindow} using the
* default timestamp and pane.
*/
public static <T> WindowedValue<T> valueInGlobalWindow(T value) {
return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING);
}
/**
* Returns a {@code WindowedValue} with the given value in the {@link GlobalWindow} using the
* default timestamp and the specified pane.
*/
public static <T> WindowedValue<T> valueInGlobalWindow(T value, PaneInfo pane) {
return new ValueInGlobalWindow<>(value, pane);
}
/**
* Returns a {@code WindowedValue} with the given value and timestamp, {@code GlobalWindow} and
* default pane.
*/
public static <T> WindowedValue<T> timestampedValueInGlobalWindow(T value, Instant timestamp) {
if (BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
return valueInGlobalWindow(value);
} else {
return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING);
}
}
/**
* Returns a new {@code WindowedValue} that is a copy of this one, but with a different value,
* which may have a new type {@code NewT}.
*/
public abstract <NewT> WindowedValue<NewT> withValue(NewT value);
/** Returns the value of this {@code WindowedValue}. */
public abstract T getValue();
/** Returns the timestamp of this {@code WindowedValue}. */
public abstract Instant getTimestamp();
/** Returns the windows of this {@code WindowedValue}. */
public abstract Collection<? extends BoundedWindow> getWindows();
/** Returns the pane of this {@code WindowedValue} in its window. */
public abstract PaneInfo getPane();
/** Returns {@code true} if this WindowedValue has exactly one window. */
public boolean isSingleWindowedValue() {
return false;
}
/**
* Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each
* is in exactly one of the windows that this {@link WindowedValue} is in.
*/
public Iterable<WindowedValue<T>> explodeWindows() {
ImmutableList.Builder<WindowedValue<T>> windowedValues = ImmutableList.builder();
for (BoundedWindow w : getWindows()) {
windowedValues.add(of(getValue(), getTimestamp(), w, getPane()));
}
return windowedValues.build();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof WindowedValue)) {
return false;
} else {
WindowedValue<?> that = (WindowedValue<?>) other;
// Compare timestamps first as they are most likely to differ.
// Also compare timestamps according to millis-since-epoch because otherwise expensive
// comparisons are made on their Chronology objects.
return this.getTimestamp().isEqual(that.getTimestamp())
&& Objects.equals(this.getValue(), that.getValue())
&& Objects.equals(this.getWindows(), that.getWindows())
&& Objects.equals(this.getPane(), that.getPane());
}
}
@Override
public int hashCode() {
// Hash only the millis of the timestamp to be consistent with equals
return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPane());
}
@Override
public abstract String toString();
private static final Collection<? extends BoundedWindow> GLOBAL_WINDOWS =
Collections.singletonList(GlobalWindow.INSTANCE);
/** A {@link WindowedValue} which holds exactly single window per value. */
public interface SingleWindowedValue {
/** @return the single window associated with this value. */
BoundedWindow getWindow();
}
/**
* An abstract superclass for implementations of {@link WindowedValue} that stores the value and
* pane info.
*/
private abstract static class SimpleWindowedValue<T> extends WindowedValue<T> {
private final T value;
private final PaneInfo pane;
protected SimpleWindowedValue(T value, PaneInfo pane) {
this.value = value;
this.pane = checkNotNull(pane);
}
@Override
public PaneInfo getPane() {
return pane;
}
@Override
public T getValue() {
return value;
}
}
/** The abstract superclass of WindowedValue representations where timestamp == MIN. */
private abstract static class MinTimestampWindowedValue<T> extends SimpleWindowedValue<T> {
public MinTimestampWindowedValue(T value, PaneInfo pane) {
super(value, pane);
}
@Override
public Instant getTimestamp() {
return BoundedWindow.TIMESTAMP_MIN_VALUE;
}
}
/** The representation of a WindowedValue where timestamp == MIN and windows == {GlobalWindow}. */
private static class ValueInGlobalWindow<T> extends MinTimestampWindowedValue<T>
implements SingleWindowedValue {
public ValueInGlobalWindow(T value, PaneInfo pane) {
super(value, pane);
}
@Override
public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
return new ValueInGlobalWindow<>(newValue, getPane());
}
@Override
public Collection<? extends BoundedWindow> getWindows() {
return GLOBAL_WINDOWS;
}
@Override
public boolean isSingleWindowedValue() {
return true;
}
@Override
public BoundedWindow getWindow() {
return GlobalWindow.INSTANCE;
}
@Override
public boolean equals(Object o) {
if (o instanceof ValueInGlobalWindow) {
ValueInGlobalWindow<?> that = (ValueInGlobalWindow<?>) o;
return Objects.equals(that.getPane(), this.getPane())
&& Objects.equals(that.getValue(), this.getValue());
} else {
return super.equals(o);
}
}
@Override
public int hashCode() {
return Objects.hash(getValue(), getPane());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("value", getValue())
.add("pane", getPane())
.toString();
}
}
/** The abstract superclass of WindowedValue representations where timestamp is arbitrary. */
private abstract static class TimestampedWindowedValue<T> extends SimpleWindowedValue<T> {
private final Instant timestamp;
public TimestampedWindowedValue(T value, Instant timestamp, PaneInfo pane) {
super(value, pane);
this.timestamp = checkNotNull(timestamp);
}
@Override
public Instant getTimestamp() {
return timestamp;
}
}
/**
* The representation of a WindowedValue where timestamp {@code >} MIN and windows ==
* {GlobalWindow}.
*/
private static class TimestampedValueInGlobalWindow<T> extends TimestampedWindowedValue<T>
implements SingleWindowedValue {
public TimestampedValueInGlobalWindow(T value, Instant timestamp, PaneInfo pane) {
super(value, timestamp, pane);
}
@Override
public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane());
}
@Override
public Collection<? extends BoundedWindow> getWindows() {
return GLOBAL_WINDOWS;
}
@Override
public boolean isSingleWindowedValue() {
return true;
}
@Override
public BoundedWindow getWindow() {
return GlobalWindow.INSTANCE;
}
@Override
public boolean equals(Object o) {
if (o instanceof TimestampedValueInGlobalWindow) {
TimestampedValueInGlobalWindow<?> that = (TimestampedValueInGlobalWindow<?>) o;
// Compare timestamps first as they are most likely to differ.
// Also compare timestamps according to millis-since-epoch because otherwise expensive
// comparisons are made on their Chronology objects.
return this.getTimestamp().isEqual(that.getTimestamp())
&& Objects.equals(that.getPane(), this.getPane())
&& Objects.equals(that.getValue(), this.getValue());
} else {
return super.equals(o);
}
}
@Override
public int hashCode() {
// Hash only the millis of the timestamp to be consistent with equals
return Objects.hash(getValue(), getPane(), getTimestamp().getMillis());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("value", getValue())
.add("timestamp", getTimestamp())
.add("pane", getPane())
.toString();
}
}
/**
* The representation of a WindowedValue where timestamp is arbitrary and windows == a single
* non-Global window.
*/
private static class TimestampedValueInSingleWindow<T> extends TimestampedWindowedValue<T>
implements SingleWindowedValue {
private final BoundedWindow window;
public TimestampedValueInSingleWindow(
T value, Instant timestamp, BoundedWindow window, PaneInfo pane) {
super(value, timestamp, pane);
this.window = checkNotNull(window);
}
@Override
public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPane());
}
@Override
public Collection<? extends BoundedWindow> getWindows() {
return Collections.singletonList(window);
}
@Override
public boolean isSingleWindowedValue() {
return true;
}
@Override
public BoundedWindow getWindow() {
return window;
}
@Override
public boolean equals(Object o) {
if (o instanceof TimestampedValueInSingleWindow) {
TimestampedValueInSingleWindow<?> that = (TimestampedValueInSingleWindow<?>) o;
// Compare timestamps first as they are most likely to differ.
// Also compare timestamps according to millis-since-epoch because otherwise expensive
// comparisons are made on their Chronology objects.
return this.getTimestamp().isEqual(that.getTimestamp())
&& Objects.equals(that.getValue(), this.getValue())
&& Objects.equals(that.getPane(), this.getPane())
&& Objects.equals(that.window, this.window);
} else {
return super.equals(o);
}
}
@Override
public int hashCode() {
// Hash only the millis of the timestamp to be consistent with equals
return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), window);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("value", getValue())
.add("timestamp", getTimestamp())
.add("window", window)
.add("pane", getPane())
.toString();
}
}
/** The representation of a WindowedValue, excluding the special cases captured above. */
private static class TimestampedValueInMultipleWindows<T> extends TimestampedWindowedValue<T> {
private Collection<? extends BoundedWindow> windows;
public TimestampedValueInMultipleWindows(
T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
super(value, timestamp, pane);
this.windows = checkNotNull(windows);
}
@Override
public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows, getPane());
}
@Override
public Collection<? extends BoundedWindow> getWindows() {
return windows;
}
@Override
public boolean equals(Object o) {
if (o instanceof TimestampedValueInMultipleWindows) {
TimestampedValueInMultipleWindows<?> that = (TimestampedValueInMultipleWindows<?>) o;
// Compare timestamps first as they are most likely to differ.
// Also compare timestamps according to millis-since-epoch because otherwise expensive
// comparisons are made on their Chronology objects.
if (this.getTimestamp().isEqual(that.getTimestamp())
&& Objects.equals(that.getValue(), this.getValue())
&& Objects.equals(that.getPane(), this.getPane())) {
ensureWindowsAreASet();
that.ensureWindowsAreASet();
return that.windows.equals(this.windows);
} else {
return false;
}
} else {
return super.equals(o);
}
}
@Override
public int hashCode() {
// Hash only the millis of the timestamp to be consistent with equals
ensureWindowsAreASet();
return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), windows);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("value", getValue())
.add("timestamp", getTimestamp())
.add("windows", windows)
.add("pane", getPane())
.toString();
}
private void ensureWindowsAreASet() {
if (!(windows instanceof Set)) {
windows = new LinkedHashSet<>(windows);
}
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Returns the {@code Coder} to use for a {@code WindowedValue<T>}, using the given valueCoder and
* windowCoder.
*/
public static <T> FullWindowedValueCoder<T> getFullCoder(
Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) {
return FullWindowedValueCoder.of(valueCoder, windowCoder);
}
/** Returns the {@code ValueOnlyCoder} from the given valueCoder. */
public static <T> ValueOnlyWindowedValueCoder<T> getValueOnlyCoder(Coder<T> valueCoder) {
return ValueOnlyWindowedValueCoder.of(valueCoder);
}
/** Abstract class for {@code WindowedValue} coder. */
public abstract static class WindowedValueCoder<T> extends StructuredCoder<WindowedValue<T>> {
final Coder<T> valueCoder;
WindowedValueCoder(Coder<T> valueCoder) {
this.valueCoder = checkNotNull(valueCoder);
}
/** Returns the value coder. */
public Coder<T> getValueCoder() {
return valueCoder;
}
/**
* Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different
* value coder.
*/
public abstract <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder);
}
/** Coder for {@code WindowedValue}. */
public static class FullWindowedValueCoder<T> extends WindowedValueCoder<T> {
private final Coder<? extends BoundedWindow> windowCoder;
// Precompute and cache the coder for a list of windows.
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
public static <T> FullWindowedValueCoder<T> of(
Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) {
return new FullWindowedValueCoder<>(valueCoder, windowCoder);
}
FullWindowedValueCoder(Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) {
super(valueCoder);
this.windowCoder = checkNotNull(windowCoder);
// It's not possible to statically type-check correct use of the
// windowCoder (we have to ensure externally that we only get
// windows of the class handled by windowCoder), so type
// windowsCoder in a way that makes encode() and decode() work
// right, and cast the window type away here.
@SuppressWarnings({"unchecked", "rawtypes"})
Coder<Collection<? extends BoundedWindow>> collectionCoder =
(Coder) CollectionCoder.of(this.windowCoder);
this.windowsCoder = collectionCoder;
}
public Coder<? extends BoundedWindow> getWindowCoder() {
return windowCoder;
}
public Coder<Collection<? extends BoundedWindow>> getWindowsCoder() {
return windowsCoder;
}
@Override
public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) {
return new FullWindowedValueCoder<>(valueCoder, windowCoder);
}
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
encode(windowedElem, outStream, Context.NESTED);
}
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context)
throws CoderException, IOException {
InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
windowsCoder.encode(windowedElem.getWindows(), outStream);
PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream);
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Context.NESTED);
}
@Override
public WindowedValue<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
Instant timestamp = InstantCoder.of().decode(inStream);
Collection<? extends BoundedWindow> windows = windowsCoder.decode(inStream);
PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
T value = valueCoder.decode(inStream, context);
// Because there are some remaining (incorrect) uses of WindowedValue with no windows,
// we call this deprecated no-validation path when decoding
return WindowedValue.createWithoutValidation(value, timestamp, windows, pane);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
this, "FullWindowedValueCoder requires a deterministic valueCoder", valueCoder);
verifyDeterministic(
this, "FullWindowedValueCoder requires a deterministic windowCoder", windowCoder);
}
@Override
public void registerByteSizeObserver(WindowedValue<T> value, ElementByteSizeObserver observer)
throws Exception {
InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer);
windowsCoder.registerByteSizeObserver(value.getWindows(), observer);
PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer);
valueCoder.registerByteSizeObserver(value.getValue(), observer);
}
/**
* {@inheritDoc}.
*
* @return a singleton list containing the {@code valueCoder} of this {@link
* FullWindowedValueCoder}.
*/
@Override
public List<? extends Coder<?>> getCoderArguments() {
// The value type is the only generic type parameter exposed by this coder. The component
// coders include the window coder as well
return Collections.singletonList(valueCoder);
}
@Override
public List<? extends Coder<?>> getComponents() {
return Arrays.asList(valueCoder, windowCoder);
}
}
/**
* Coder for {@code WindowedValue}.
*
* <p>A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops timestamp
* and windows for encoding, and uses defaults timestamp, and windows for decoding.
*/
public static class ValueOnlyWindowedValueCoder<T> extends WindowedValueCoder<T> {
public static <T> ValueOnlyWindowedValueCoder<T> of(Coder<T> valueCoder) {
return new ValueOnlyWindowedValueCoder<>(valueCoder);
}
ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
super(valueCoder);
}
@Override
public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) {
return new ValueOnlyWindowedValueCoder<>(valueCoder);
}
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
encode(windowedElem, outStream, Context.NESTED);
}
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context)
throws CoderException, IOException {
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Context.NESTED);
}
@Override
public WindowedValue<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
T value = valueCoder.decode(inStream, context);
return WindowedValue.valueInGlobalWindow(value);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
this, "ValueOnlyWindowedValueCoder requires a deterministic valueCoder", valueCoder);
}
@Override
public void registerByteSizeObserver(WindowedValue<T> value, ElementByteSizeObserver observer)
throws Exception {
valueCoder.registerByteSizeObserver(value.getValue(), observer);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Collections.singletonList(valueCoder);
}
}
}