Merge pull request #14712: Rollforward "Resolve beam_fn_api experiment for side inputs, using portable expansion always"
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
index 369aad9..267b4d5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
@@ -25,14 +25,18 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -99,13 +103,22 @@
@Override
public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollection<KV<Void, T>> materializationInput =
- input.apply(new View.VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
+ PCollection<KV<Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>> materializationInput =
+ input
+ .apply("IndexElements", ParDo.of(new View.ToListViewDoFn<>()))
+ .setCoder(
+ KvCoder.of(
+ BigEndianLongCoder.of(),
+ PCollectionViews.ValueOrMetadataCoder.create(
+ inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
- PCollectionViews.listViewUsingVoidKey(
+ PCollectionViews.listView(
materializationInput,
- (TupleTag<Materializations.MultimapView<Void, T>>) originalView.getTagInternal(),
+ (TupleTag<
+ Materializations.MultimapView<
+ Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>>)
+ originalView.getTagInternal(),
(PCollectionViews.TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(View.CreatePCollectionView.of(view));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 37fa6fc..35d865f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -69,11 +69,17 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews.HasDefaultValue;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.ListViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.MapViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.MapViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn2;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
@@ -104,7 +110,13 @@
private static final Object NULL_PLACE_HOLDER = new Object();
private static final ImmutableList<Class<? extends ViewFn>> KNOWN_SINGLETON_VIEW_TYPES =
- ImmutableList.of(SingletonViewFn.class, MapViewFn.class, MultimapViewFn.class);
+ ImmutableList.of(
+ SingletonViewFn.class,
+ SingletonViewFn2.class,
+ MapViewFn.class,
+ MapViewFn2.class,
+ MultimapViewFn.class,
+ MultimapViewFn2.class);
/**
* Limit the number of concurrent initializations.
@@ -302,7 +314,7 @@
// We handle the singleton case separately since a null value may be returned.
// We use a null place holder to represent this, and when we detect it, we translate
// back to null for the user.
- if (viewFn instanceof SingletonViewFn) {
+ if (viewFn instanceof SingletonViewFn || viewFn instanceof SingletonViewFn2) {
ViewT rval =
executionContext
.<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache()
@@ -311,7 +323,7 @@
() -> {
@SuppressWarnings("unchecked")
ViewT viewT =
- getSingletonForWindow(tag, (SingletonViewFn<ViewT>) viewFn, window);
+ getSingletonForWindow(tag, (HasDefaultValue<ViewT>) viewFn, window);
@SuppressWarnings("unchecked")
ViewT nullPlaceHolder = (ViewT) NULL_PLACE_HOLDER;
return viewT == null ? nullPlaceHolder : viewT;
@@ -319,7 +331,10 @@
return rval == NULL_PLACE_HOLDER ? null : rval;
} else if (singletonMaterializedTags.contains(tag)) {
checkArgument(
- viewFn instanceof MapViewFn || viewFn instanceof MultimapViewFn,
+ viewFn instanceof MapViewFn
+ || viewFn instanceof MapViewFn2
+ || viewFn instanceof MultimapViewFn
+ || viewFn instanceof MultimapViewFn2,
"Unknown view type stored as singleton. Expected one of %s, got %s",
KNOWN_SINGLETON_VIEW_TYPES,
viewFn.getClass().getName());
@@ -336,15 +351,19 @@
.get(
PCollectionViewWindow.of(view, window),
() -> {
- if (viewFn instanceof IterableViewFn || viewFn instanceof ListViewFn) {
+ if (viewFn instanceof IterableViewFn
+ || viewFn instanceof IterableViewFn2
+ || viewFn instanceof ListViewFn
+ || viewFn instanceof ListViewFn2) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getListForWindow(tag, window);
return viewT;
- } else if (viewFn instanceof MapViewFn) {
+ } else if (viewFn instanceof MapViewFn || viewFn instanceof MapViewFn2) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getMapForWindow(tag, window);
return viewT;
- } else if (viewFn instanceof MultimapViewFn) {
+ } else if (viewFn instanceof MultimapViewFn
+ || viewFn instanceof MultimapViewFn2) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getMultimapForWindow(tag, window);
return viewT;
@@ -375,7 +394,7 @@
* </ul>
*/
private <T, W extends BoundedWindow> T getSingletonForWindow(
- TupleTag<?> viewTag, SingletonViewFn<T> viewFn, W window) throws IOException {
+ TupleTag<?> viewTag, HasDefaultValue<T> viewFn, W window) throws IOException {
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"unchecked"
diff --git a/runners/portability/java/build.gradle b/runners/portability/java/build.gradle
index bce5404..4ec158d 100644
--- a/runners/portability/java/build.gradle
+++ b/runners/portability/java/build.gradle
@@ -192,6 +192,13 @@
// https://issues.apache.org/jira/browse/BEAM-10452
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'
+ // https://issues.apache.org/jira/browse/BEAM-12275
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests.testSideInputAnnotationWithMultipleSideInputs'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMapAsEntrySetSideInput'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMultimapAsEntrySetSideInput'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMapAsEntrySetSideInput'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMultimapAsEntrySetSideInput'
+
// https://issues.apache.org/jira/browse/BEAM-10995
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation'
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 63b4191..a3b5665 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
@@ -47,7 +46,6 @@
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1308,43 +1306,21 @@
@Override
public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- // TODO(BEAM-10097): Make this the default expansion for all portable runners.
- if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
- && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
- || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
- PCollection<OutputT> combined =
- input.apply(
- "CombineValues",
- Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
- Coder<OutputT> outputCoder = combined.getCoder();
- PCollectionView<OutputT> view =
- PCollectionViews.singletonView(
- combined,
- (TypeDescriptorSupplier<OutputT>)
- () -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
- input.getWindowingStrategy(),
- insertDefault,
- insertDefault ? fn.defaultValue() : null,
- combined.getCoder());
- combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
- return view;
- }
-
PCollection<OutputT> combined =
- input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
- PCollection<KV<Void, OutputT>> materializationInput =
- combined.apply(new VoidKeyToMultimapMaterialization<>());
+ input.apply(
+ "CombineValues",
+ Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
Coder<OutputT> outputCoder = combined.getCoder();
PCollectionView<OutputT> view =
- PCollectionViews.singletonViewUsingVoidKey(
- materializationInput,
+ PCollectionViews.singletonView(
+ combined,
(TypeDescriptorSupplier<OutputT>)
() -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
input.getWindowingStrategy(),
insertDefault,
insertDefault ? fn.defaultValue() : null,
combined.getCoder());
- materializationInput.apply(CreatePCollectionView.of(view));
+ combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
return view;
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 904575c..e81f0b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
-
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,7 +27,6 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -260,33 +257,16 @@
* Long#MIN_VALUE} key is used to store all known {@link OffsetRange ranges} allowing us to
* compute such an ordering.
*/
-
- // TODO(BEAM-10097): Make this the default expansion for all portable runners.
- if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
- && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
- || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
- Coder<T> inputCoder = input.getCoder();
- PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
- input
- .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
- .setCoder(
- KvCoder.of(
- BigEndianLongCoder.of(),
- ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- materializationInput,
- (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
- input.getWindowingStrategy());
- materializationInput.apply(CreatePCollectionView.of(view));
- return view;
- }
-
- PCollection<KV<Void, T>> materializationInput =
- input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
+ PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
+ input
+ .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
+ .setCoder(
+ KvCoder.of(
+ BigEndianLongCoder.of(),
+ ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
- PCollectionViews.listViewUsingVoidKey(
+ PCollectionViews.listView(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
@@ -300,8 +280,8 @@
* range for each window seen. We use random offset ranges to minimize the chance that two ranges
* overlap increasing the odds that each "key" represents a single index.
*/
- private static class ToListViewDoFn<T>
- extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
+ @Internal
+ public static class ToListViewDoFn<T> extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
private Map<BoundedWindow, OffsetRange> windowsToOffsets = new HashMap<>();
private OffsetRange generateRange(BoundedWindow window) {
@@ -350,29 +330,19 @@
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- // TODO(BEAM-10097): Make this the default expansion for all portable runners.
- if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
- && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
- || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
- Coder<T> inputCoder = input.getCoder();
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input,
- (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
- input.getWindowingStrategy());
- input.apply(CreatePCollectionView.of(view));
- return view;
- }
-
- PCollection<KV<Void, T>> materializationInput =
- input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
+ // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
+ // There are bugs in "composite" vs "primitive" transform distinction
+ // in TransformHierachy. This noop transform works around them and should be zero
+ // cost.
+ PCollection<T> materializationInput =
+ input.apply(MapElements.via(new SimpleFunction<T, T>(x -> x) {}));
PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableViewUsingVoidKey(
+ PCollectionViews.iterableView(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
- materializationInput.apply(CreatePCollectionView.of(view));
+ input.apply(CreatePCollectionView.of(view));
return view;
}
}
@@ -508,35 +478,22 @@
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- // TODO(BEAM-10097): Make this the default expansion for all portable runners.
- if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
- && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
- || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input,
- (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
- (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
- input.getWindowingStrategy());
- input.apply(CreatePCollectionView.of(view));
- return view;
- }
-
KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();
- PCollection<KV<Void, KV<K, V>>> materializationInput =
- input.apply(new VoidKeyToMultimapMaterialization<>());
+ // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
+ // There are bugs in "composite" vs "primitive" transform distinction
+ // in TransformHierachy. This noop transform works around them and should be zero
+ // cost.
+ PCollection<KV<K, V>> materializationInput =
+ input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapViewUsingVoidKey(
+ PCollectionViews.multimapView(
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
- materializationInput.apply(CreatePCollectionView.of(view));
+ input.apply(CreatePCollectionView.of(view));
return view;
}
}
@@ -567,37 +524,19 @@
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- // TODO(BEAM-10097): Make this the default expansion for all portable runners.
- if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
- && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
- || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
-
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input,
- (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
- (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
- input.getWindowingStrategy());
- input.apply(CreatePCollectionView.of(view));
- return view;
- }
-
KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();
- PCollection<KV<Void, KV<K, V>>> materializationInput =
- input.apply(new VoidKeyToMultimapMaterialization<>());
+ PCollection<KV<K, V>> materializationInput =
+ input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
PCollectionView<Map<K, V>> view =
- PCollectionViews.mapViewUsingVoidKey(
+ PCollectionViews.mapView(
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
- materializationInput.apply(CreatePCollectionView.of(view));
+ input.apply(CreatePCollectionView.of(view));
return view;
}
}
@@ -606,34 +545,11 @@
// Internal details below
/**
- * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
- *
- * <p>TODO(BEAM-10097): Replace this materialization with specializations that optimize the
- * various SDK requested views.
- */
- @Internal
- public static class VoidKeyToMultimapMaterialization<T>
- extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
-
- private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
- @ProcessElement
- public void processElement(@Element T element, OutputReceiver<KV<Void, T>> r) {
- r.output(KV.of((Void) null, element));
- }
- }
-
- @Override
- public PCollection<KV<Void, T>> expand(PCollection<T> input) {
- PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
- output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
- return output;
- }
- }
-
- /**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
- * <p>Creates a primitive {@link PCollectionView}.
+ * <p>Placeholder transform for runners to have a hook to materialize a {@link PCollection} as a
+ * side input. The metadata included in the {@link PCollectionView} is how the {@link PCollection}
+ * will be read as a side input.
*
* @param <ElemT> The type of the elements of the input PCollection
* @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index df88e21..360c1af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -120,6 +120,7 @@
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<T> singletonViewUsingVoidKey(
+ TupleTag<MultimapView<Void, T>> tag,
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy,
@@ -128,6 +129,7 @@
Coder<T> defaultValueCoder) {
return new SimplePCollectionView<>(
pCollection,
+ tag,
new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder, typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -156,11 +158,13 @@
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableViewUsingVoidKey(
+ TupleTag<MultimapView<Void, T>> tag,
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
+ tag,
new IterableViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -184,16 +188,35 @@
/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
+ */
+ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
+ PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
+ TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
+ TypeDescriptorSupplier<T> typeDescriptorSupplier,
+ WindowingStrategy<?, W> windowingStrategy) {
+ return new SimplePCollectionView<>(
+ pCollection,
+ tag,
+ new ListViewFn2<>(typeDescriptorSupplier),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy);
+ }
+
+ /**
+ * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
+ * provided {@link WindowingStrategy}.
*
* @deprecated See {@link #listView}.
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewUsingVoidKey(
+ TupleTag<MultimapView<Void, T>> tag,
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
+ tag,
new ListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -243,12 +266,14 @@
*/
@Deprecated
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapViewUsingVoidKey(
+ TupleTag<MultimapView<Void, KV<K, V>>> tag,
PCollection<KV<Void, KV<K, V>>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
+ tag,
new MapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -279,12 +304,14 @@
@Deprecated
public static <K, V, W extends BoundedWindow>
PCollectionView<Map<K, Iterable<V>>> multimapViewUsingVoidKey(
+ TupleTag<MultimapView<Void, KV<K, V>>> tag,
PCollection<KV<Void, KV<K, V>>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
+ tag,
new MultimapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -312,7 +339,9 @@
* <p>{@link SingletonViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {
+ @Internal
+ public static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T>
+ implements HasDefaultValue<T> {
private byte @Nullable [] encodedDefaultValue;
private transient @Nullable T defaultValue;
private @Nullable Coder<T> valueCoder;
@@ -350,6 +379,7 @@
*
* @throws NoSuchElementException if no default was specified.
*/
+ @Override
public T getDefaultValue() {
if (!hasDefault) {
throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -393,6 +423,11 @@
}
}
+ @Internal
+ public interface HasDefaultValue<T> {
+ T getDefaultValue();
+ }
+
/**
* Implementation which is able to adapt a multimap materialization to a {@code T}.
*
@@ -402,7 +437,8 @@
*/
@Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T> {
+ public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T>
+ implements HasDefaultValue<T> {
private byte @Nullable [] encodedDefaultValue;
private transient @Nullable T defaultValue;
private @Nullable Coder<T> valueCoder;
@@ -440,6 +476,7 @@
*
* @throws NoSuchElementException if no default was specified.
*/
+ @Override
public T getDefaultValue() {
if (!hasDefault) {
throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -493,7 +530,8 @@
* <p>{@link IterableViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- private static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
+ @Internal
+ public static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
public IterableViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
@@ -559,7 +597,7 @@
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
@VisibleForTesting
- static class ListViewFn2<T>
+ public static class ListViewFn2<T>
extends ViewFn<MultimapView<Long, ValueOrMetadata<T, OffsetRange>>, List<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
@@ -1003,7 +1041,8 @@
* <p>{@link MultimapViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- private static class MultimapViewFn2<K, V>
+ @Internal
+ public static class MultimapViewFn2<K, V>
extends ViewFn<MultimapView<K, V>, Map<K, Iterable<V>>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
@@ -1091,7 +1130,8 @@
*
* <p>{@link MapViewFn} is meant to be removed in the future and replaced with this class.
*/
- private static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
+ @Internal
+ public static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
@@ -1279,7 +1319,13 @@
@Override
public String toString() {
- return MoreObjects.toStringHelper(this).add("tag", tag).toString();
+ return MoreObjects.toStringHelper(this)
+ .add("tag", tag)
+ .add("viewFn", viewFn)
+ .add("coder", coder)
+ .add("windowMappingFn", windowMappingFn)
+ .add("pCollection", pCollection)
+ .toString();
}
@Override
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index b340d91..61b4bf8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.testing;
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -42,82 +40,42 @@
// materializations will differ but test code should not worry about what these look like if
// they are relying on the ViewFn to "undo" the conversion.
- // TODO(BEAM-10097): Make this the default case once all portable runners can support
- // the iterable access pattern.
- if (hasExperiment(options, "beam_fn_api")
- && (hasExperiment(options, "use_runner_v2")
- || hasExperiment(options, "use_unified_worker"))) {
- if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
+ if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
+ }
+ } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
+ }
+ } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+ if (values.length > 0) {
+ rval.add(
+ KV.of(
+ Long.MIN_VALUE, ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
+ for (int i = 0; i < values.length; ++i) {
+ rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
}
- } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
- }
- } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
- if (values.length > 0) {
- rval.add(
- KV.of(
- Long.MIN_VALUE,
- ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
- for (int i = 0; i < values.length; ++i) {
- rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
- }
- }
- } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
- }
- } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
- }
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Unknown type of view %s. Supported views are %s.",
- viewTransformClass.getClass(),
- ImmutableSet.of(
- View.AsSingleton.class,
- View.AsIterable.class,
- View.AsList.class,
- View.AsMap.class,
- View.AsMultimap.class)));
+ }
+ } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
+ }
+ } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
}
} else {
- if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(KV.of(null, value));
- }
- } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(KV.of(null, value));
- }
- } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(KV.of(null, value));
- }
- } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(KV.of(null, value));
- }
- } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(KV.of(null, value));
- }
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Unknown type of view %s. Supported views are %s.",
- viewTransformClass.getClass(),
- ImmutableSet.of(
- View.AsSingleton.class,
- View.AsIterable.class,
- View.AsList.class,
- View.AsMap.class,
- View.AsMultimap.class)));
- }
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown type of view %s. Supported views are %s.",
+ viewTransformClass.getClass(),
+ ImmutableSet.of(
+ View.AsSingleton.class,
+ View.AsIterable.class,
+ View.AsList.class,
+ View.AsMap.class,
+ View.AsMultimap.class)));
}
return Collections.unmodifiableList(rval);
}