blob: df88e21772dcd0a8152cb8982e9ce92c749ed524 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.values;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.RandomAccess;
import java.util.Set;
import java.util.SortedMap;
import java.util.function.Supplier;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.Materializations.IterableView;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* <b>For internal use only; no backwards compatibility guarantees.</b>
*
* <p>Implementations of {@link PCollectionView} shared across the SDK.
*/
@Internal
@SuppressWarnings({
"keyfor",
"nullness", // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
"rawtypes"
})
public class PCollectionViews {
public interface TypeDescriptorSupplier<T> extends Supplier<TypeDescriptor<T>>, Serializable {}
/**
* Returns a {@code PCollectionView<T>} capable of processing elements windowed using the provided
* {@link WindowingStrategy}.
*
* <p>If {@code hasDefault} is {@code true}, then the view will take on the value {@code
* defaultValue} for any empty windows.
*/
public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
PCollection<T> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy,
boolean hasDefault,
@Nullable T defaultValue,
Coder<T> defaultValueCoder) {
return new SimplePCollectionView<>(
pCollection,
new SingletonViewFn2<>(hasDefault, defaultValue, defaultValueCoder, typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<T>} capable of processing elements windowed using the provided
* {@link WindowingStrategy}.
*
* <p>If {@code hasDefault} is {@code true}, then the view will take on the value {@code
* defaultValue} for any empty windows.
*
* @deprecated See {@link #singletonView}.
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<T> singletonViewUsingVoidKey(
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy,
boolean hasDefault,
@Nullable T defaultValue,
Coder<T> defaultValueCoder) {
return new SimplePCollectionView<>(
pCollection,
new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder, typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements windowed using
* the provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
PCollection<T> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new IterableViewFn2<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements windowed using
* the provided {@link WindowingStrategy}.
*
* @deprecated See {@link #iterableView}.
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableViewUsingVoidKey(
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new IterableViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new ListViewFn2<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*
* @deprecated See {@link #listView}.
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewUsingVoidKey(
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new ListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*
* @deprecated See {@link #listView}.
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewUsingVoidKey(
PCollection<KV<Void, T>> pCollection,
TupleTag<MultimapView<Void, T>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
tag,
new ListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
PCollection<KV<K, V>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MapViewFn2<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*
* @deprecated See {@link #mapView}.
*/
@Deprecated
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapViewUsingVoidKey(
PCollection<KV<Void, KV<K, V>>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements windowed
* using the provided {@link WindowingStrategy}.
*/
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
PCollection<KV<K, V>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MultimapViewFn2<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements windowed
* using the provided {@link WindowingStrategy}.
*
* @deprecated See {@link #multimapView}.
*/
@Deprecated
public static <K, V, W extends BoundedWindow>
PCollectionView<Map<K, Iterable<V>>> multimapViewUsingVoidKey(
PCollection<KV<Void, KV<K, V>>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MultimapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}
/**
* Expands a list of {@link PCollectionView} into the form needed for {@link
* PTransform#getAdditionalInputs()}.
*/
public static Map<TupleTag<?>, PValue> toAdditionalInputs(Iterable<PCollectionView<?>> views) {
ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
for (PCollectionView<?> view : views) {
additionalInputs.put(view.getTagInternal(), view.getPCollection());
}
return additionalInputs.build();
}
/**
* Implementation which is able to adapt an iterable materialization to a {@code T}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#singletonView}.
*
* <p>{@link SingletonViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {
private byte @Nullable [] encodedDefaultValue;
private transient @Nullable T defaultValue;
private @Nullable Coder<T> valueCoder;
private boolean hasDefault;
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
private SingletonViewFn2(
boolean hasDefault,
T defaultValue,
Coder<T> valueCoder,
TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.hasDefault = hasDefault;
this.defaultValue = defaultValue;
this.valueCoder = valueCoder;
this.typeDescriptorSupplier = typeDescriptorSupplier;
if (hasDefault) {
try {
this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue);
} catch (IOException e) {
throw new RuntimeException("Unexpected IOException: ", e);
}
}
}
/** Returns if a default value was specified. */
@Internal
public boolean hasDefault() {
return hasDefault;
}
/**
* Returns the default value that was specified.
*
* <p>For internal use only.
*
* @throws NoSuchElementException if no default was specified.
*/
public T getDefaultValue() {
if (!hasDefault) {
throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
}
// Lazily decode the default value once
synchronized (this) {
if (encodedDefaultValue != null) {
try {
defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
// Clear the encoded default value to free the reference once we have the object
// version. Also, this will guarantee that the value will only be decoded once.
encodedDefaultValue = null;
} catch (IOException e) {
throw new RuntimeException("Unexpected IOException: ", e);
}
}
return defaultValue;
}
}
@Override
public Materialization<IterableView<T>> getMaterialization() {
return Materializations.iterable();
}
@Override
public T apply(IterableView<T> primitiveViewT) {
try {
return Iterables.getOnlyElement(primitiveViewT.get());
} catch (NoSuchElementException exc) {
return getDefaultValue();
} catch (IllegalArgumentException exc) {
throw new IllegalArgumentException(
"PCollection with more than one element accessed as a singleton view.");
}
}
@Override
public TypeDescriptor<T> getTypeDescriptor() {
return typeDescriptorSupplier.get();
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code T}.
*
* <p>For internal use only.
*
* @deprecated See {@link SingletonViewFn2}.
*/
@Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T> {
private byte @Nullable [] encodedDefaultValue;
private transient @Nullable T defaultValue;
private @Nullable Coder<T> valueCoder;
private boolean hasDefault;
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
private SingletonViewFn(
boolean hasDefault,
T defaultValue,
Coder<T> valueCoder,
TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.hasDefault = hasDefault;
this.defaultValue = defaultValue;
this.valueCoder = valueCoder;
this.typeDescriptorSupplier = typeDescriptorSupplier;
if (hasDefault) {
try {
this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue);
} catch (IOException e) {
throw new RuntimeException("Unexpected IOException: ", e);
}
}
}
/** Returns if a default value was specified. */
@Internal
public boolean hasDefault() {
return hasDefault;
}
/**
* Returns the default value that was specified.
*
* <p>For internal use only.
*
* @throws NoSuchElementException if no default was specified.
*/
public T getDefaultValue() {
if (!hasDefault) {
throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
}
// Lazily decode the default value once
synchronized (this) {
if (encodedDefaultValue != null) {
try {
defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
// Clear the encoded default value to free the reference once we have the object
// version. Also, this will guarantee that the value will only be decoded once.
encodedDefaultValue = null;
} catch (IOException e) {
throw new RuntimeException("Unexpected IOException: ", e);
}
}
return defaultValue;
}
}
@Override
public Materialization<MultimapView<Void, T>> getMaterialization() {
return Materializations.multimap();
}
@Override
public T apply(MultimapView<Void, T> primitiveViewT) {
try {
return Iterables.getOnlyElement(primitiveViewT.get(null));
} catch (NoSuchElementException exc) {
return getDefaultValue();
} catch (IllegalArgumentException exc) {
throw new IllegalArgumentException(
"PCollection with more than one element accessed as a singleton view.");
}
}
@Override
public TypeDescriptor<T> getTypeDescriptor() {
return typeDescriptorSupplier.get();
}
}
/**
* Implementation which is able to adapt an iterable materialization to a {@code Iterable<T>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#iterableView}.
*
* <p>{@link IterableViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
private static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
public IterableViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}
@Override
public Materialization<IterableView<T>> getMaterialization() {
return Materializations.iterable();
}
@Override
public Iterable<T> apply(IterableView<T> primitiveViewT) {
return Iterables.unmodifiableIterable(primitiveViewT.get());
}
@Override
public TypeDescriptor<Iterable<T>> getTypeDescriptor() {
return TypeDescriptors.iterables(typeDescriptorSupplier.get());
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code Iterable<T>}.
*
* <p>For internal use only.
*
* @deprecated See {@link IterableViewFn2}.
*/
@Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class IterableViewFn<T> extends ViewFn<MultimapView<Void, T>, Iterable<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
public IterableViewFn(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}
@Override
public Materialization<MultimapView<Void, T>> getMaterialization() {
return Materializations.multimap();
}
@Override
public Iterable<T> apply(MultimapView<Void, T> primitiveViewT) {
return Iterables.unmodifiableIterable(primitiveViewT.get(null));
}
@Override
public TypeDescriptor<Iterable<T>> getTypeDescriptor() {
return TypeDescriptors.iterables(typeDescriptorSupplier.get());
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#listView}.
*
* <p>{@link ListViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
@VisibleForTesting
static class ListViewFn2<T>
extends ViewFn<MultimapView<Long, ValueOrMetadata<T, OffsetRange>>, List<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
public ListViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}
@Override
public Materialization<MultimapView<Long, ValueOrMetadata<T, OffsetRange>>>
getMaterialization() {
return Materializations.multimap();
}
@Override
public List<T> apply(MultimapView<Long, ValueOrMetadata<T, OffsetRange>> primitiveViewT) {
return Collections.unmodifiableList(new ListOverMultimapView<>(primitiveViewT));
}
@Override
public TypeDescriptor<List<T>> getTypeDescriptor() {
return TypeDescriptors.lists(typeDescriptorSupplier.get());
}
/**
* A {@link List} adapter over a {@link MultimapView}.
*
* <p>See {@link View.AsList} for a description of the materialized format and {@code index} to
* {@code (position, sub-position)} mapping details.
*/
private static class ListOverMultimapView<T> extends AbstractList<T> implements RandomAccess {
private final MultimapView<Long, ValueOrMetadata<T, OffsetRange>> primitiveView;
/**
* A mapping from non over-lapping ranges to the number of elements at each position within
* that range. Ranges not specified in the mapping implicitly have 0 elements at those
* positions.
*
* <p>Used to quickly compute the {@code index} -> {@code (position, sub-position} within the
* map.
*/
private final Supplier<SortedMap<OffsetRange, Integer>>
nonOverlappingRangesToNumElementsPerPosition;
private final Supplier<Integer> size;
private ListOverMultimapView(
MultimapView<Long, ValueOrMetadata<T, OffsetRange>> primitiveView) {
this.primitiveView = primitiveView;
this.nonOverlappingRangesToNumElementsPerPosition =
Suppliers.memoize(
() ->
computeOverlappingRanges(
Iterables.transform(
primitiveView.get(Long.MIN_VALUE), (value) -> value.getMetadata())));
this.size =
Suppliers.memoize(
() -> computeTotalNumElements(nonOverlappingRangesToNumElementsPerPosition.get()));
}
@Override
public T get(int index) {
if (index < 0 || index >= size.get()) {
throw new IndexOutOfBoundsException();
}
KV<Long, Integer> position =
computePositionForIndex(nonOverlappingRangesToNumElementsPerPosition.get(), index);
return Iterables.get(primitiveView.get(position.getKey()), position.getValue()).get();
}
@Override
public int size() {
return size.get();
}
@Override
public Iterator<T> iterator() {
return listIterator();
}
@Override
public ListIterator<T> listIterator() {
return super.listIterator();
}
/** A {@link ListIterator} over {@link MultimapView} adapter. */
private class ListIteratorOverMultimapView implements ListIterator<T> {
private int position;
@Override
public boolean hasNext() {
return position < size();
}
@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
T rval = get(position);
position += 1;
return rval;
}
@Override
public boolean hasPrevious() {
return position > 0;
}
@Override
public T previous() {
if (!hasPrevious()) {
throw new NoSuchElementException();
}
position -= 1;
return get(position);
}
@Override
public int nextIndex() {
return position;
}
@Override
public int previousIndex() {
return position - 1;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void set(T e) {
throw new UnsupportedOperationException();
}
@Override
public void add(T e) {
throw new UnsupportedOperationException();
}
}
}
}
/**
* Compares {@link OffsetRange}s such that ranges are ordered by the smallest {@code from} and in
* case of a tie the smallest {@code to}.
*/
@VisibleForTesting
static class OffsetRangeComparator implements Comparator<OffsetRange> {
private static final OffsetRangeComparator INSTANCE = new OffsetRangeComparator();
@Override
public int compare(OffsetRange o1, OffsetRange o2) {
int fromComparison = Longs.compare(o1.getFrom(), o2.getFrom());
if (fromComparison != 0) {
return fromComparison;
}
return Longs.compare(o1.getTo(), o2.getTo());
}
}
@VisibleForTesting
static SortedMap<OffsetRange, Integer> computeOverlappingRanges(Iterable<OffsetRange> ranges) {
ImmutableSortedMap.Builder<OffsetRange, Integer> rval =
ImmutableSortedMap.orderedBy(OffsetRangeComparator.INSTANCE);
List<OffsetRange> sortedRanges = Lists.newArrayList(ranges);
if (sortedRanges.isEmpty()) {
return rval.build();
}
Collections.sort(sortedRanges, OffsetRangeComparator.INSTANCE);
// Stores ranges in smallest 'from' and then smallest 'to' order
// e.g. [2, 7), [3, 4), [3, 5), [3, 5), [3, 6), [4, 0)
PriorityQueue<OffsetRange> rangesWithSameFrom =
new PriorityQueue<>(OffsetRangeComparator.INSTANCE);
Iterator<OffsetRange> iterator = sortedRanges.iterator();
// Stored in reverse sorted order so that when we iterate and re-add them back to
// overlappingRanges they are stored in sorted order from smallest to largest range.to
List<OffsetRange> rangesToProcess = new ArrayList<>();
while (iterator.hasNext()) {
OffsetRange current = iterator.next();
// Skip empty ranges
if (current.getFrom() == current.getTo()) {
continue;
}
// If the current range has a different 'from' then a prior range then we must produce
// ranges in [rangesWithSameFrom.from, current.from)
while (!rangesWithSameFrom.isEmpty()
&& rangesWithSameFrom.peek().getFrom() != current.getFrom()) {
rangesToProcess.addAll(rangesWithSameFrom);
Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
rangesWithSameFrom.clear();
int i = 0;
long lastTo = rangesToProcess.get(i).getFrom();
// Output all the ranges that are strictly less then current.from
// e.g. current.to := 7 for [3, 4), [3, 5), [3, 5), [3, 6) will produce
// [3, 4) := 4
// [4, 5) := 3
// [5, 6) := 1
for (; i < rangesToProcess.size(); ++i) {
if (rangesToProcess.get(i).getTo() > current.getFrom()) {
break;
}
// Output only the first of any subsequent duplicate ranges
if (i == 0 || rangesToProcess.get(i - 1).getTo() != rangesToProcess.get(i).getTo()) {
rval.put(
new OffsetRange(lastTo, rangesToProcess.get(i).getTo()),
rangesToProcess.size() - i);
lastTo = rangesToProcess.get(i).getTo();
}
}
// We exitted the loop with 'to' > current.from, we must add the range [lastTo,
// current.from) if it is non-empty
if (lastTo < current.getFrom() && i != rangesToProcess.size()) {
rval.put(new OffsetRange(lastTo, current.getFrom()), rangesToProcess.size() - i);
}
// The remaining ranges have a 'to' that is greater then 'current.from' and will overlap
// with current so add them back to rangesWithSameFrom with the updated 'from'
for (; i < rangesToProcess.size(); ++i) {
rangesWithSameFrom.add(
new OffsetRange(current.getFrom(), rangesToProcess.get(i).getTo()));
}
rangesToProcess.clear();
}
rangesWithSameFrom.add(current);
}
// Process the last chunk of overlapping ranges
while (!rangesWithSameFrom.isEmpty()) {
// This range always represents the range with with the smallest 'to'
OffsetRange current = rangesWithSameFrom.remove();
rangesToProcess.addAll(rangesWithSameFrom);
Collections.sort(rangesToProcess, OffsetRangeComparator.INSTANCE);
rangesWithSameFrom.clear();
rval.put(current, rangesToProcess.size() + 1 /* include current */);
// Shorten all the remaining ranges such that they start with current.to
for (OffsetRange rangeWithDifferentFrom : rangesToProcess) {
// Skip any duplicates of current
if (rangeWithDifferentFrom.getTo() > current.getTo()) {
rangesWithSameFrom.add(new OffsetRange(current.getTo(), rangeWithDifferentFrom.getTo()));
}
}
rangesToProcess.clear();
}
return rval.build();
}
@VisibleForTesting
static int computeTotalNumElements(
Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition) {
long sum = 0;
for (Map.Entry<OffsetRange, Integer> range :
nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
sum +=
Math.multiplyExact(
Math.subtractExact(range.getKey().getTo(), range.getKey().getFrom()),
range.getValue());
}
return Ints.checkedCast(sum);
}
@VisibleForTesting
static KV<Long, Integer> computePositionForIndex(
Map<OffsetRange, Integer> nonOverlappingRangesToNumElementsPerPosition, int index) {
if (index < 0) {
throw new IndexOutOfBoundsException(
String.format(
"Position %s was out of bounds for ranges %s.",
index, nonOverlappingRangesToNumElementsPerPosition));
}
for (Map.Entry<OffsetRange, Integer> range :
nonOverlappingRangesToNumElementsPerPosition.entrySet()) {
int numElementsInRange =
Ints.checkedCast(
Math.multiplyExact(
Math.subtractExact(range.getKey().getTo(), range.getKey().getFrom()),
range.getValue()));
if (numElementsInRange <= index) {
index -= numElementsInRange;
continue;
}
long position = range.getKey().getFrom() + index / range.getValue();
int subPosition = index % range.getValue();
return KV.of(position, subPosition);
}
throw new IndexOutOfBoundsException(
String.format(
"Position %s was out of bounds for ranges %s.",
index, nonOverlappingRangesToNumElementsPerPosition));
}
/** Stores values or metadata about values. */
public static class ValueOrMetadata<T, MetaT> {
private final T value;
private final boolean isMetadata;
private final MetaT metadata;
public static <T, MetaT> ValueOrMetadata<T, MetaT> create(T value) {
return new ValueOrMetadata(false, value, null);
}
public static <T, MetaT> ValueOrMetadata<T, MetaT> createMetadata(MetaT metadata) {
return new ValueOrMetadata(true, null, metadata);
}
public ValueOrMetadata(boolean isMetadata, T value, MetaT metadata) {
this.isMetadata = isMetadata;
this.value = value;
this.metadata = metadata;
}
public T get() {
checkState(!isMetadata);
return value;
}
public boolean isMetadata() {
return isMetadata;
}
public MetaT getMetadata() {
checkState(isMetadata);
return metadata;
}
}
/** A coder for {@link ValueOrMetadata}. */
public static class ValueOrMetadataCoder<T, MetaT>
extends StructuredCoder<ValueOrMetadata<T, MetaT>> {
public static <T, MetaT> ValueOrMetadataCoder<T, MetaT> create(
Coder<T> valueCoder, Coder<MetaT> metadataCoder) {
return new ValueOrMetadataCoder<>(valueCoder, metadataCoder);
}
private final Coder<T> valueCoder;
private final Coder<MetaT> metadataCoder;
private ValueOrMetadataCoder(Coder<T> valueCoder, Coder<MetaT> metadataCoder) {
this.valueCoder = valueCoder;
this.metadataCoder = metadataCoder;
}
@Override
public void encode(ValueOrMetadata<T, MetaT> value, OutputStream outStream)
throws CoderException, IOException {
BooleanCoder.of().encode(value.isMetadata(), outStream);
if (value.isMetadata()) {
metadataCoder.encode(value.getMetadata(), outStream);
} else {
valueCoder.encode(value.get(), outStream);
}
}
@Override
public ValueOrMetadata<T, MetaT> decode(InputStream inStream)
throws CoderException, IOException {
boolean isMetadata = BooleanCoder.of().decode(inStream);
if (isMetadata) {
return ValueOrMetadata.createMetadata(metadataCoder.decode(inStream));
} else {
return ValueOrMetadata.create(valueCoder.decode(inStream));
}
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Arrays.asList(valueCoder, metadataCoder);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(valueCoder, "value coder");
verifyDeterministic(metadataCoder, "metadata coder");
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
* <p>For internal use only.
*
* @deprecated See {@link ListViewFn2}.
*/
@Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class ListViewFn<T> extends ViewFn<MultimapView<Void, T>, List<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
public ListViewFn(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}
@Override
public Materialization<MultimapView<Void, T>> getMaterialization() {
return Materializations.multimap();
}
@Override
public List<T> apply(MultimapView<Void, T> primitiveViewT) {
List<T> list = new ArrayList<>();
for (T t : primitiveViewT.get(null)) {
list.add(t);
}
return Collections.unmodifiableList(list);
}
@Override
public TypeDescriptor<List<T>> getTypeDescriptor() {
return TypeDescriptors.lists(typeDescriptorSupplier.get());
}
@Override
public boolean equals(@Nullable Object other) {
return other instanceof ListViewFn;
}
@Override
public int hashCode() {
return ListViewFn.class.hashCode();
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code Map<K,
* Iterable<V>>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#multimapView}.
*
* <p>{@link MultimapViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
private static class MultimapViewFn2<K, V>
extends ViewFn<MultimapView<K, V>, Map<K, Iterable<V>>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
public MultimapViewFn2(
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier) {
this.keyTypeDescriptorSupplier = keyTypeDescriptorSupplier;
this.valueTypeDescriptorSupplier = valueTypeDescriptorSupplier;
}
@Override
public Materialization<MultimapView<K, V>> getMaterialization() {
return Materializations.multimap();
}
@Override
public Map<K, Iterable<V>> apply(MultimapView<K, V> primitiveViewT) {
return Collections.unmodifiableMap(new MultimapViewToMultimapAdapter<>(primitiveViewT));
}
@Override
public TypeDescriptor<Map<K, Iterable<V>>> getTypeDescriptor() {
return TypeDescriptors.maps(
keyTypeDescriptorSupplier.get(),
TypeDescriptors.iterables(valueTypeDescriptorSupplier.get()));
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code Map<K,
* Iterable<V>>}.
*
* <p>For internal use only.
*
* @deprecated See {@link MultimapViewFn2}.
*/
@Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class MultimapViewFn<K, V>
extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, Iterable<V>>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
public MultimapViewFn(
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier) {
this.keyTypeDescriptorSupplier = keyTypeDescriptorSupplier;
this.valueTypeDescriptorSupplier = valueTypeDescriptorSupplier;
}
@Override
public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
return Materializations.multimap();
}
@Override
public Map<K, Iterable<V>> apply(MultimapView<Void, KV<K, V>> primitiveViewT) {
// TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
// using structural value equality.
Multimap<K, V> multimap = ArrayListMultimap.create();
for (KV<K, V> elem : primitiveViewT.get(null)) {
multimap.put(elem.getKey(), elem.getValue());
}
// Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
@SuppressWarnings({"unchecked", "rawtypes"})
Map<K, Iterable<V>> resultMap = (Map) multimap.asMap();
return Collections.unmodifiableMap(resultMap);
}
@Override
public TypeDescriptor<Map<K, Iterable<V>>> getTypeDescriptor() {
return TypeDescriptors.maps(
keyTypeDescriptorSupplier.get(),
TypeDescriptors.iterables(valueTypeDescriptorSupplier.get()));
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code Map<K, V>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#mapView}.
*
* <p>{@link MapViewFn} is meant to be removed in the future and replaced with this class.
*/
private static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
public MapViewFn2(
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier) {
this.keyTypeDescriptorSupplier = keyTypeDescriptorSupplier;
this.valueTypeDescriptorSupplier = valueTypeDescriptorSupplier;
}
@Override
public Materialization<MultimapView<K, V>> getMaterialization() {
return Materializations.multimap();
}
@Override
public Map<K, V> apply(MultimapView<K, V> primitiveViewT) {
return Collections.unmodifiableMap(new MultimapViewToMapAdapter<>(primitiveViewT));
}
@Override
public TypeDescriptor<Map<K, V>> getTypeDescriptor() {
return TypeDescriptors.maps(
keyTypeDescriptorSupplier.get(), valueTypeDescriptorSupplier.get());
}
}
/**
* Implementation which is able to adapt a multimap materialization to a {@code Map<K, V>}.
*
* <p>For internal use only.
*
* @deprecated See {@link MapViewFn2}.
*/
@Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class MapViewFn<K, V> extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, V>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
public MapViewFn(
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier) {
this.keyTypeDescriptorSupplier = keyTypeDescriptorSupplier;
this.valueTypeDescriptorSupplier = valueTypeDescriptorSupplier;
}
@Override
public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
return Materializations.multimap();
}
@Override
public Map<K, V> apply(MultimapView<Void, KV<K, V>> primitiveViewT) {
// TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
// using structural value equality.
Map<K, V> map = new HashMap<>();
for (KV<K, V> elem : primitiveViewT.get(null)) {
if (map.containsKey(elem.getKey())) {
throw new IllegalArgumentException("Duplicate values for " + elem.getKey());
}
map.put(elem.getKey(), elem.getValue());
}
return Collections.unmodifiableMap(map);
}
@Override
public TypeDescriptor<Map<K, V>> getTypeDescriptor() {
return TypeDescriptors.maps(
keyTypeDescriptorSupplier.get(), valueTypeDescriptorSupplier.get());
}
}
/**
* A class for {@link PCollectionView} implementations, with additional type parameters that are
* not visible at pipeline assembly time when the view is used as a side input.
*
* <p>For internal use only.
*/
public static class SimplePCollectionView<ElemT, PrimitiveViewT, ViewT, W extends BoundedWindow>
extends PValueBase implements PCollectionView<ViewT> {
/** The {@link PCollection} this view was originally created from. */
private transient PCollection<ElemT> pCollection;
/** A unique tag for the view, typed according to the elements underlying the view. */
private TupleTag<PrimitiveViewT> tag;
private WindowMappingFn<W> windowMappingFn;
/** The windowing strategy for the PCollection underlying the view. */
private WindowingStrategy<?, W> windowingStrategy;
/** The coder for the elements underlying the view. */
private @Nullable Coder<ElemT> coder;
/** The typed {@link ViewFn} for this view. */
private ViewFn<PrimitiveViewT, ViewT> viewFn;
/**
* Call this constructor to initialize the fields for which this base class provides boilerplate
* accessors.
*/
private SimplePCollectionView(
PCollection<ElemT> pCollection,
TupleTag<PrimitiveViewT> tag,
ViewFn<PrimitiveViewT, ViewT> viewFn,
WindowMappingFn<W> windowMappingFn,
WindowingStrategy<?, W> windowingStrategy) {
super(pCollection.getPipeline());
this.pCollection = pCollection;
this.windowMappingFn = windowMappingFn;
this.tag = tag;
this.windowingStrategy = windowingStrategy;
this.viewFn = viewFn;
this.coder = pCollection.getCoder();
}
/**
* Call this constructor to initialize the fields for which this base class provides boilerplate
* accessors, with an auto-generated tag.
*/
private SimplePCollectionView(
PCollection<ElemT> pCollection,
ViewFn<PrimitiveViewT, ViewT> viewFn,
WindowMappingFn<W> windowMappingFn,
WindowingStrategy<?, W> windowingStrategy) {
this(pCollection, new TupleTag<>(), viewFn, windowMappingFn, windowingStrategy);
}
@Override
public ViewFn<PrimitiveViewT, ViewT> getViewFn() {
return viewFn;
}
@Override
public WindowMappingFn<?> getWindowMappingFn() {
return windowMappingFn;
}
@Override
public PCollection<?> getPCollection() {
return pCollection;
}
/**
* Returns a unique {@link TupleTag} identifying this {@link PCollectionView}.
*
* <p>For internal use only by runner implementors.
*/
@Override
public TupleTag<?> getTagInternal() {
return tag;
}
/**
* Returns the {@link WindowingStrategy} of this {@link PCollectionView}, which should be that
* of the underlying {@link PCollection}.
*
* <p>For internal use only by runner implementors.
*/
@Override
public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
return windowingStrategy;
}
@Override
public Coder<?> getCoderInternal() {
return coder;
}
@Override
public int hashCode() {
return Objects.hash(tag);
}
@Override
public boolean equals(@Nullable Object other) {
if (!(other instanceof PCollectionView)) {
return false;
}
@SuppressWarnings("unchecked")
PCollectionView<?> otherView = (PCollectionView<?>) other;
return tag.equals(otherView.getTagInternal());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("tag", tag).toString();
}
@Override
public Map<TupleTag<?>, PValue> expand() {
return Collections.singletonMap(tag, pCollection);
}
}
/** A {@link MultimapView} to {@link Map Map<K, V>} adapter. */
private static class MultimapViewToMapAdapter<K, V> extends AbstractMap<K, V> {
private final MultimapView<K, V> primitiveViewT;
private final Supplier<Integer> size;
private MultimapViewToMapAdapter(MultimapView<K, V> primitiveViewT) {
this.primitiveViewT = primitiveViewT;
this.size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));
}
@Override
public boolean containsKey(Object key) {
return primitiveViewT.get((K) key).iterator().hasNext();
}
@Override
public V get(Object key) {
Iterator<V> iterator = primitiveViewT.get((K) key).iterator();
if (!iterator.hasNext()) {
return null;
}
V value = iterator.next();
if (iterator.hasNext()) {
throw new IllegalArgumentException("Duplicate values for " + key);
}
return value;
}
@Override
public int size() {
return size.get();
}
@Override
public Set<Entry<K, V>> entrySet() {
return new AbstractSet<Entry<K, V>>() {
@Override
public Iterator<Entry<K, V>> iterator() {
return FluentIterable.from(primitiveViewT.get())
.<Entry<K, V>>transform((K key) -> new SimpleEntry<>(key, get(key)))
.iterator();
}
@Override
public boolean contains(Object o) {
if (!(o instanceof Entry)) {
return false;
}
Entry<?, ?> entry = (Entry<?, ?>) o;
// We treat the absence of the key in the map as a difference in these abstract sets. The
// underlying primitive view represents missing keys as empty iterables so we use this
// to check if the map contains the key first before comparing values.
Iterable<V> value = primitiveViewT.get((K) entry.getKey());
if (value.iterator().hasNext()) {
return false;
}
return Objects.equals(entry.getValue(), value);
}
@Override
public int size() {
return size.get();
}
};
}
}
/** A {@link MultimapView} to {@link Map Map<K, Iterable<V>>} adapter. */
private static class MultimapViewToMultimapAdapter<K, V> extends AbstractMap<K, Iterable<V>> {
private final MultimapView<K, V> primitiveViewT;
private final Supplier<Integer> size;
private MultimapViewToMultimapAdapter(MultimapView<K, V> primitiveViewT) {
this.primitiveViewT = primitiveViewT;
this.size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));
}
@Override
public boolean containsKey(Object key) {
return primitiveViewT.get((K) key).iterator().hasNext();
}
@Override
public Iterable<V> get(Object key) {
Iterable<V> values = primitiveViewT.get((K) key);
// The only way for the values iterable to be empty is for us to have never seen such a key
// during materialization and hence we return 'null' to satisfy Java's Map contract.
if (values.iterator().hasNext()) {
return values;
} else {
return null;
}
}
@Override
public int size() {
return size.get();
}
@Override
public Set<Entry<K, Iterable<V>>> entrySet() {
return new AbstractSet<Entry<K, Iterable<V>>>() {
@Override
public Iterator<Entry<K, Iterable<V>>> iterator() {
return FluentIterable.from(primitiveViewT.get())
.<Entry<K, Iterable<V>>>transform(
(K key) -> new SimpleEntry<>(key, primitiveViewT.get(key)))
.iterator();
}
@Override
public boolean contains(Object o) {
if (!(o instanceof Entry)) {
return false;
}
Entry<?, ?> entry = (Entry<?, ?>) o;
if (!(entry.getValue() instanceof Iterable)) {
return false;
}
// We treat the absence of the key in the map as a difference in these abstract sets. The
// underlying primitive view represents missing keys as empty iterables so we use this
// to check if the map contains the key first before comparing values.
Iterable<V> value = primitiveViewT.get((K) entry.getKey());
if (value.iterator().hasNext()) {
return false;
}
return Iterables.elementsEqual((Iterable<?>) entry.getValue(), value);
}
@Override
public int size() {
return size.get();
}
};
}
}
}