| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.util; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Set; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.CoderException; |
| import org.apache.beam.sdk.coders.CollectionCoder; |
| import org.apache.beam.sdk.coders.InstantCoder; |
| import org.apache.beam.sdk.coders.StructuredCoder; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindow; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; |
| import org.apache.beam.sdk.util.common.ElementByteSizeObserver; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.joda.time.Instant; |
| |
| /** |
| * An immutable triple of value, timestamp, and windows. |
| * |
| * @param <T> the type of the value |
| */ |
| public abstract class WindowedValue<T> { |
| |
| /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ |
| public static <T> WindowedValue<T> of( |
| T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { |
| checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null"); |
| checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); |
| |
| if (windows.size() == 1) { |
| return of(value, timestamp, windows.iterator().next(), pane); |
| } else { |
| return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); |
| } |
| } |
| |
| /** @deprecated for use only in compatibility with old broken code */ |
| @Deprecated |
| static <T> WindowedValue<T> createWithoutValidation( |
| T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { |
| if (windows.size() == 1) { |
| return of(value, timestamp, windows.iterator().next(), pane); |
| } else { |
| return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); |
| } |
| } |
| |
| /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ |
| public static <T> WindowedValue<T> of( |
| T value, Instant timestamp, BoundedWindow window, PaneInfo pane) { |
| checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null"); |
| |
| boolean isGlobal = GlobalWindow.INSTANCE.equals(window); |
| if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { |
| return valueInGlobalWindow(value, pane); |
| } else if (isGlobal) { |
| return new TimestampedValueInGlobalWindow<>(value, timestamp, pane); |
| } else { |
| return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane); |
| } |
| } |
| |
| /** |
| * Returns a {@code WindowedValue} with the given value in the {@link GlobalWindow} using the |
| * default timestamp and pane. |
| */ |
| public static <T> WindowedValue<T> valueInGlobalWindow(T value) { |
| return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING); |
| } |
| |
| /** |
| * Returns a {@code WindowedValue} with the given value in the {@link GlobalWindow} using the |
| * default timestamp and the specified pane. |
| */ |
| public static <T> WindowedValue<T> valueInGlobalWindow(T value, PaneInfo pane) { |
| return new ValueInGlobalWindow<>(value, pane); |
| } |
| |
| /** |
| * Returns a {@code WindowedValue} with the given value and timestamp, {@code GlobalWindow} and |
| * default pane. |
| */ |
| public static <T> WindowedValue<T> timestampedValueInGlobalWindow(T value, Instant timestamp) { |
| if (BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { |
| return valueInGlobalWindow(value); |
| } else { |
| return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING); |
| } |
| } |
| |
| /** |
| * Returns a new {@code WindowedValue} that is a copy of this one, but with a different value, |
| * which may have a new type {@code NewT}. |
| */ |
| public abstract <NewT> WindowedValue<NewT> withValue(NewT value); |
| |
| /** Returns the value of this {@code WindowedValue}. */ |
| public abstract T getValue(); |
| |
| /** Returns the timestamp of this {@code WindowedValue}. */ |
| public abstract Instant getTimestamp(); |
| |
| /** Returns the windows of this {@code WindowedValue}. */ |
| public abstract Collection<? extends BoundedWindow> getWindows(); |
| |
| /** Returns the pane of this {@code WindowedValue} in its window. */ |
| public abstract PaneInfo getPane(); |
| |
| /** Returns {@code true} if this WindowedValue has exactly one window. */ |
| public boolean isSingleWindowedValue() { |
| return false; |
| } |
| |
| /** |
| * Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each |
| * is in exactly one of the windows that this {@link WindowedValue} is in. |
| */ |
| public Iterable<WindowedValue<T>> explodeWindows() { |
| ImmutableList.Builder<WindowedValue<T>> windowedValues = ImmutableList.builder(); |
| for (BoundedWindow w : getWindows()) { |
| windowedValues.add(of(getValue(), getTimestamp(), w, getPane())); |
| } |
| return windowedValues.build(); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof WindowedValue)) { |
| return false; |
| } else { |
| WindowedValue<?> that = (WindowedValue<?>) other; |
| |
| // Compare timestamps first as they are most likely to differ. |
| // Also compare timestamps according to millis-since-epoch because otherwise expensive |
| // comparisons are made on their Chronology objects. |
| return this.getTimestamp().isEqual(that.getTimestamp()) |
| && Objects.equals(this.getValue(), that.getValue()) |
| && Objects.equals(this.getWindows(), that.getWindows()) |
| && Objects.equals(this.getPane(), that.getPane()); |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| // Hash only the millis of the timestamp to be consistent with equals |
| return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPane()); |
| } |
| |
| @Override |
| public abstract String toString(); |
| |
| private static final Collection<? extends BoundedWindow> GLOBAL_WINDOWS = |
| Collections.singletonList(GlobalWindow.INSTANCE); |
| |
| /** A {@link WindowedValue} which holds exactly single window per value. */ |
| public interface SingleWindowedValue { |
| |
| /** @return the single window associated with this value. */ |
| BoundedWindow getWindow(); |
| } |
| |
| /** |
| * An abstract superclass for implementations of {@link WindowedValue} that stores the value and |
| * pane info. |
| */ |
| private abstract static class SimpleWindowedValue<T> extends WindowedValue<T> { |
| |
| private final T value; |
| private final PaneInfo pane; |
| |
| protected SimpleWindowedValue(T value, PaneInfo pane) { |
| this.value = value; |
| this.pane = checkNotNull(pane); |
| } |
| |
| @Override |
| public PaneInfo getPane() { |
| return pane; |
| } |
| |
| @Override |
| public T getValue() { |
| return value; |
| } |
| } |
| |
| /** The abstract superclass of WindowedValue representations where timestamp == MIN. */ |
| private abstract static class MinTimestampWindowedValue<T> extends SimpleWindowedValue<T> { |
| public MinTimestampWindowedValue(T value, PaneInfo pane) { |
| super(value, pane); |
| } |
| |
| @Override |
| public Instant getTimestamp() { |
| return BoundedWindow.TIMESTAMP_MIN_VALUE; |
| } |
| } |
| |
| /** The representation of a WindowedValue where timestamp == MIN and windows == {GlobalWindow}. */ |
| private static class ValueInGlobalWindow<T> extends MinTimestampWindowedValue<T> |
| implements SingleWindowedValue { |
| |
| public ValueInGlobalWindow(T value, PaneInfo pane) { |
| super(value, pane); |
| } |
| |
| @Override |
| public <NewT> WindowedValue<NewT> withValue(NewT newValue) { |
| return new ValueInGlobalWindow<>(newValue, getPane()); |
| } |
| |
| @Override |
| public Collection<? extends BoundedWindow> getWindows() { |
| return GLOBAL_WINDOWS; |
| } |
| |
| @Override |
| public boolean isSingleWindowedValue() { |
| return true; |
| } |
| |
| @Override |
| public BoundedWindow getWindow() { |
| return GlobalWindow.INSTANCE; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o instanceof ValueInGlobalWindow) { |
| ValueInGlobalWindow<?> that = (ValueInGlobalWindow<?>) o; |
| return Objects.equals(that.getPane(), this.getPane()) |
| && Objects.equals(that.getValue(), this.getValue()); |
| } else { |
| return super.equals(o); |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(getValue(), getPane()); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(getClass()) |
| .add("value", getValue()) |
| .add("pane", getPane()) |
| .toString(); |
| } |
| } |
| |
| /** The abstract superclass of WindowedValue representations where timestamp is arbitrary. */ |
| private abstract static class TimestampedWindowedValue<T> extends SimpleWindowedValue<T> { |
| private final Instant timestamp; |
| |
| public TimestampedWindowedValue(T value, Instant timestamp, PaneInfo pane) { |
| super(value, pane); |
| this.timestamp = checkNotNull(timestamp); |
| } |
| |
| @Override |
| public Instant getTimestamp() { |
| return timestamp; |
| } |
| } |
| |
| /** |
| * The representation of a WindowedValue where timestamp {@code >} MIN and windows == |
| * {GlobalWindow}. |
| */ |
| private static class TimestampedValueInGlobalWindow<T> extends TimestampedWindowedValue<T> |
| implements SingleWindowedValue { |
| |
| public TimestampedValueInGlobalWindow(T value, Instant timestamp, PaneInfo pane) { |
| super(value, timestamp, pane); |
| } |
| |
| @Override |
| public <NewT> WindowedValue<NewT> withValue(NewT newValue) { |
| return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane()); |
| } |
| |
| @Override |
| public Collection<? extends BoundedWindow> getWindows() { |
| return GLOBAL_WINDOWS; |
| } |
| |
| @Override |
| public boolean isSingleWindowedValue() { |
| return true; |
| } |
| |
| @Override |
| public BoundedWindow getWindow() { |
| return GlobalWindow.INSTANCE; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o instanceof TimestampedValueInGlobalWindow) { |
| TimestampedValueInGlobalWindow<?> that = (TimestampedValueInGlobalWindow<?>) o; |
| // Compare timestamps first as they are most likely to differ. |
| // Also compare timestamps according to millis-since-epoch because otherwise expensive |
| // comparisons are made on their Chronology objects. |
| return this.getTimestamp().isEqual(that.getTimestamp()) |
| && Objects.equals(that.getPane(), this.getPane()) |
| && Objects.equals(that.getValue(), this.getValue()); |
| } else { |
| return super.equals(o); |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| // Hash only the millis of the timestamp to be consistent with equals |
| return Objects.hash(getValue(), getPane(), getTimestamp().getMillis()); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(getClass()) |
| .add("value", getValue()) |
| .add("timestamp", getTimestamp()) |
| .add("pane", getPane()) |
| .toString(); |
| } |
| } |
| |
| /** |
| * The representation of a WindowedValue where timestamp is arbitrary and windows == a single |
| * non-Global window. |
| */ |
| private static class TimestampedValueInSingleWindow<T> extends TimestampedWindowedValue<T> |
| implements SingleWindowedValue { |
| |
| private final BoundedWindow window; |
| |
| public TimestampedValueInSingleWindow( |
| T value, Instant timestamp, BoundedWindow window, PaneInfo pane) { |
| super(value, timestamp, pane); |
| this.window = checkNotNull(window); |
| } |
| |
| @Override |
| public <NewT> WindowedValue<NewT> withValue(NewT newValue) { |
| return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPane()); |
| } |
| |
| @Override |
| public Collection<? extends BoundedWindow> getWindows() { |
| return Collections.singletonList(window); |
| } |
| |
| @Override |
| public boolean isSingleWindowedValue() { |
| return true; |
| } |
| |
| @Override |
| public BoundedWindow getWindow() { |
| return window; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o instanceof TimestampedValueInSingleWindow) { |
| TimestampedValueInSingleWindow<?> that = (TimestampedValueInSingleWindow<?>) o; |
| // Compare timestamps first as they are most likely to differ. |
| // Also compare timestamps according to millis-since-epoch because otherwise expensive |
| // comparisons are made on their Chronology objects. |
| return this.getTimestamp().isEqual(that.getTimestamp()) |
| && Objects.equals(that.getValue(), this.getValue()) |
| && Objects.equals(that.getPane(), this.getPane()) |
| && Objects.equals(that.window, this.window); |
| } else { |
| return super.equals(o); |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| // Hash only the millis of the timestamp to be consistent with equals |
| return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), window); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(getClass()) |
| .add("value", getValue()) |
| .add("timestamp", getTimestamp()) |
| .add("window", window) |
| .add("pane", getPane()) |
| .toString(); |
| } |
| } |
| |
| /** The representation of a WindowedValue, excluding the special cases captured above. */ |
| private static class TimestampedValueInMultipleWindows<T> extends TimestampedWindowedValue<T> { |
| private Collection<? extends BoundedWindow> windows; |
| |
| public TimestampedValueInMultipleWindows( |
| T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { |
| super(value, timestamp, pane); |
| this.windows = checkNotNull(windows); |
| } |
| |
| @Override |
| public <NewT> WindowedValue<NewT> withValue(NewT newValue) { |
| return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows, getPane()); |
| } |
| |
| @Override |
| public Collection<? extends BoundedWindow> getWindows() { |
| return windows; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o instanceof TimestampedValueInMultipleWindows) { |
| TimestampedValueInMultipleWindows<?> that = (TimestampedValueInMultipleWindows<?>) o; |
| // Compare timestamps first as they are most likely to differ. |
| // Also compare timestamps according to millis-since-epoch because otherwise expensive |
| // comparisons are made on their Chronology objects. |
| if (this.getTimestamp().isEqual(that.getTimestamp()) |
| && Objects.equals(that.getValue(), this.getValue()) |
| && Objects.equals(that.getPane(), this.getPane())) { |
| ensureWindowsAreASet(); |
| that.ensureWindowsAreASet(); |
| return that.windows.equals(this.windows); |
| } else { |
| return false; |
| } |
| } else { |
| return super.equals(o); |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| // Hash only the millis of the timestamp to be consistent with equals |
| ensureWindowsAreASet(); |
| return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), windows); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(getClass()) |
| .add("value", getValue()) |
| .add("timestamp", getTimestamp()) |
| .add("windows", windows) |
| .add("pane", getPane()) |
| .toString(); |
| } |
| |
| private void ensureWindowsAreASet() { |
| if (!(windows instanceof Set)) { |
| windows = new LinkedHashSet<>(windows); |
| } |
| } |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Returns the {@code Coder} to use for a {@code WindowedValue<T>}, using the given valueCoder and |
| * windowCoder. |
| */ |
| public static <T> FullWindowedValueCoder<T> getFullCoder( |
| Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) { |
| return FullWindowedValueCoder.of(valueCoder, windowCoder); |
| } |
| |
| /** Returns the {@code ValueOnlyCoder} from the given valueCoder. */ |
| public static <T> ValueOnlyWindowedValueCoder<T> getValueOnlyCoder(Coder<T> valueCoder) { |
| return ValueOnlyWindowedValueCoder.of(valueCoder); |
| } |
| |
| /** Abstract class for {@code WindowedValue} coder. */ |
| public abstract static class WindowedValueCoder<T> extends StructuredCoder<WindowedValue<T>> { |
| final Coder<T> valueCoder; |
| |
| WindowedValueCoder(Coder<T> valueCoder) { |
| this.valueCoder = checkNotNull(valueCoder); |
| } |
| |
| /** Returns the value coder. */ |
| public Coder<T> getValueCoder() { |
| return valueCoder; |
| } |
| |
| /** |
| * Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different |
| * value coder. |
| */ |
| public abstract <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder); |
| } |
| |
| /** Coder for {@code WindowedValue}. */ |
| public static class FullWindowedValueCoder<T> extends WindowedValueCoder<T> { |
| private final Coder<? extends BoundedWindow> windowCoder; |
| // Precompute and cache the coder for a list of windows. |
| private final Coder<Collection<? extends BoundedWindow>> windowsCoder; |
| |
| public static <T> FullWindowedValueCoder<T> of( |
| Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) { |
| return new FullWindowedValueCoder<>(valueCoder, windowCoder); |
| } |
| |
| FullWindowedValueCoder(Coder<T> valueCoder, Coder<? extends BoundedWindow> windowCoder) { |
| super(valueCoder); |
| this.windowCoder = checkNotNull(windowCoder); |
| // It's not possible to statically type-check correct use of the |
| // windowCoder (we have to ensure externally that we only get |
| // windows of the class handled by windowCoder), so type |
| // windowsCoder in a way that makes encode() and decode() work |
| // right, and cast the window type away here. |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| Coder<Collection<? extends BoundedWindow>> collectionCoder = |
| (Coder) CollectionCoder.of(this.windowCoder); |
| this.windowsCoder = collectionCoder; |
| } |
| |
| public Coder<? extends BoundedWindow> getWindowCoder() { |
| return windowCoder; |
| } |
| |
| public Coder<Collection<? extends BoundedWindow>> getWindowsCoder() { |
| return windowsCoder; |
| } |
| |
| @Override |
| public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) { |
| return new FullWindowedValueCoder<>(valueCoder, windowCoder); |
| } |
| |
| @Override |
| public void encode(WindowedValue<T> windowedElem, OutputStream outStream) |
| throws CoderException, IOException { |
| encode(windowedElem, outStream, Context.NESTED); |
| } |
| |
| @Override |
| public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context) |
| throws CoderException, IOException { |
| InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); |
| windowsCoder.encode(windowedElem.getWindows(), outStream); |
| PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); |
| valueCoder.encode(windowedElem.getValue(), outStream, context); |
| } |
| |
| @Override |
| public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException { |
| return decode(inStream, Context.NESTED); |
| } |
| |
| @Override |
| public WindowedValue<T> decode(InputStream inStream, Context context) |
| throws CoderException, IOException { |
| Instant timestamp = InstantCoder.of().decode(inStream); |
| Collection<? extends BoundedWindow> windows = windowsCoder.decode(inStream); |
| PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream); |
| T value = valueCoder.decode(inStream, context); |
| |
| // Because there are some remaining (incorrect) uses of WindowedValue with no windows, |
| // we call this deprecated no-validation path when decoding |
| return WindowedValue.createWithoutValidation(value, timestamp, windows, pane); |
| } |
| |
| @Override |
| public void verifyDeterministic() throws NonDeterministicException { |
| verifyDeterministic( |
| this, "FullWindowedValueCoder requires a deterministic valueCoder", valueCoder); |
| verifyDeterministic( |
| this, "FullWindowedValueCoder requires a deterministic windowCoder", windowCoder); |
| } |
| |
| @Override |
| public void registerByteSizeObserver(WindowedValue<T> value, ElementByteSizeObserver observer) |
| throws Exception { |
| InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer); |
| windowsCoder.registerByteSizeObserver(value.getWindows(), observer); |
| PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer); |
| valueCoder.registerByteSizeObserver(value.getValue(), observer); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * |
| * @return a singleton list containing the {@code valueCoder} of this {@link |
| * FullWindowedValueCoder}. |
| */ |
| @Override |
| public List<? extends Coder<?>> getCoderArguments() { |
| // The value type is the only generic type parameter exposed by this coder. The component |
| // coders include the window coder as well |
| return Collections.singletonList(valueCoder); |
| } |
| |
| @Override |
| public List<? extends Coder<?>> getComponents() { |
| return Arrays.asList(valueCoder, windowCoder); |
| } |
| } |
| |
| /** |
| * Coder for {@code WindowedValue}. |
| * |
| * <p>A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops timestamp |
| * and windows for encoding, and uses defaults timestamp, and windows for decoding. |
| */ |
| public static class ValueOnlyWindowedValueCoder<T> extends WindowedValueCoder<T> { |
| public static <T> ValueOnlyWindowedValueCoder<T> of(Coder<T> valueCoder) { |
| return new ValueOnlyWindowedValueCoder<>(valueCoder); |
| } |
| |
| ValueOnlyWindowedValueCoder(Coder<T> valueCoder) { |
| super(valueCoder); |
| } |
| |
| @Override |
| public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) { |
| return new ValueOnlyWindowedValueCoder<>(valueCoder); |
| } |
| |
| @Override |
| public void encode(WindowedValue<T> windowedElem, OutputStream outStream) |
| throws CoderException, IOException { |
| encode(windowedElem, outStream, Context.NESTED); |
| } |
| |
| @Override |
| public void encode(WindowedValue<T> windowedElem, OutputStream outStream, Context context) |
| throws CoderException, IOException { |
| valueCoder.encode(windowedElem.getValue(), outStream, context); |
| } |
| |
| @Override |
| public WindowedValue<T> decode(InputStream inStream) throws CoderException, IOException { |
| return decode(inStream, Context.NESTED); |
| } |
| |
| @Override |
| public WindowedValue<T> decode(InputStream inStream, Context context) |
| throws CoderException, IOException { |
| T value = valueCoder.decode(inStream, context); |
| return WindowedValue.valueInGlobalWindow(value); |
| } |
| |
| @Override |
| public void verifyDeterministic() throws NonDeterministicException { |
| verifyDeterministic( |
| this, "ValueOnlyWindowedValueCoder requires a deterministic valueCoder", valueCoder); |
| } |
| |
| @Override |
| public void registerByteSizeObserver(WindowedValue<T> value, ElementByteSizeObserver observer) |
| throws Exception { |
| valueCoder.registerByteSizeObserver(value.getValue(), observer); |
| } |
| |
| @Override |
| public List<? extends Coder<?>> getCoderArguments() { |
| return Collections.singletonList(valueCoder); |
| } |
| } |
| } |