[BEAM-11839] Add reason for most Singleton() requirements (#14569)
[BEAM-11839] Add reason for most Singleton() requirements
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
index 267b4d5..369aad9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
@@ -25,18 +25,14 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -103,22 +99,13 @@
@Override
public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollection<KV<Void, T>> materializationInput =
+ input.apply(new View.VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
- PCollection<KV<Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>> materializationInput =
- input
- .apply("IndexElements", ParDo.of(new View.ToListViewDoFn<>()))
- .setCoder(
- KvCoder.of(
- BigEndianLongCoder.of(),
- PCollectionViews.ValueOrMetadataCoder.create(
- inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
- PCollectionViews.listView(
+ PCollectionViews.listViewUsingVoidKey(
materializationInput,
- (TupleTag<
- Materializations.MultimapView<
- Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>>)
- originalView.getTagInternal(),
+ (TupleTag<Materializations.MultimapView<Void, T>>) originalView.getTagInternal(),
(PCollectionViews.TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
materializationInput.apply(View.CreatePCollectionView.of(view));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 35d865f..37fa6fc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -69,17 +69,11 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews.HasDefaultValue;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.ListViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.MapViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.MapViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn2;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
@@ -110,13 +104,7 @@
private static final Object NULL_PLACE_HOLDER = new Object();
private static final ImmutableList<Class<? extends ViewFn>> KNOWN_SINGLETON_VIEW_TYPES =
- ImmutableList.of(
- SingletonViewFn.class,
- SingletonViewFn2.class,
- MapViewFn.class,
- MapViewFn2.class,
- MultimapViewFn.class,
- MultimapViewFn2.class);
+ ImmutableList.of(SingletonViewFn.class, MapViewFn.class, MultimapViewFn.class);
/**
* Limit the number of concurrent initializations.
@@ -314,7 +302,7 @@
// We handle the singleton case separately since a null value may be returned.
// We use a null place holder to represent this, and when we detect it, we translate
// back to null for the user.
- if (viewFn instanceof SingletonViewFn || viewFn instanceof SingletonViewFn2) {
+ if (viewFn instanceof SingletonViewFn) {
ViewT rval =
executionContext
.<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache()
@@ -323,7 +311,7 @@
() -> {
@SuppressWarnings("unchecked")
ViewT viewT =
- getSingletonForWindow(tag, (HasDefaultValue<ViewT>) viewFn, window);
+ getSingletonForWindow(tag, (SingletonViewFn<ViewT>) viewFn, window);
@SuppressWarnings("unchecked")
ViewT nullPlaceHolder = (ViewT) NULL_PLACE_HOLDER;
return viewT == null ? nullPlaceHolder : viewT;
@@ -331,10 +319,7 @@
return rval == NULL_PLACE_HOLDER ? null : rval;
} else if (singletonMaterializedTags.contains(tag)) {
checkArgument(
- viewFn instanceof MapViewFn
- || viewFn instanceof MapViewFn2
- || viewFn instanceof MultimapViewFn
- || viewFn instanceof MultimapViewFn2,
+ viewFn instanceof MapViewFn || viewFn instanceof MultimapViewFn,
"Unknown view type stored as singleton. Expected one of %s, got %s",
KNOWN_SINGLETON_VIEW_TYPES,
viewFn.getClass().getName());
@@ -351,19 +336,15 @@
.get(
PCollectionViewWindow.of(view, window),
() -> {
- if (viewFn instanceof IterableViewFn
- || viewFn instanceof IterableViewFn2
- || viewFn instanceof ListViewFn
- || viewFn instanceof ListViewFn2) {
+ if (viewFn instanceof IterableViewFn || viewFn instanceof ListViewFn) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getListForWindow(tag, window);
return viewT;
- } else if (viewFn instanceof MapViewFn || viewFn instanceof MapViewFn2) {
+ } else if (viewFn instanceof MapViewFn) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getMapForWindow(tag, window);
return viewT;
- } else if (viewFn instanceof MultimapViewFn
- || viewFn instanceof MultimapViewFn2) {
+ } else if (viewFn instanceof MultimapViewFn) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getMultimapForWindow(tag, window);
return viewT;
@@ -394,7 +375,7 @@
* </ul>
*/
private <T, W extends BoundedWindow> T getSingletonForWindow(
- TupleTag<?> viewTag, HasDefaultValue<T> viewFn, W window) throws IOException {
+ TupleTag<?> viewTag, SingletonViewFn<T> viewFn, W window) throws IOException {
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"unchecked"
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index a3b5665..63b4191 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
@@ -46,6 +47,7 @@
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1306,21 +1308,43 @@
@Override
public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+ if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+ && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+ || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+ PCollection<OutputT> combined =
+ input.apply(
+ "CombineValues",
+ Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+ Coder<OutputT> outputCoder = combined.getCoder();
+ PCollectionView<OutputT> view =
+ PCollectionViews.singletonView(
+ combined,
+ (TypeDescriptorSupplier<OutputT>)
+ () -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
+ input.getWindowingStrategy(),
+ insertDefault,
+ insertDefault ? fn.defaultValue() : null,
+ combined.getCoder());
+ combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
+ return view;
+ }
+
PCollection<OutputT> combined =
- input.apply(
- "CombineValues",
- Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+ input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+ PCollection<KV<Void, OutputT>> materializationInput =
+ combined.apply(new VoidKeyToMultimapMaterialization<>());
Coder<OutputT> outputCoder = combined.getCoder();
PCollectionView<OutputT> view =
- PCollectionViews.singletonView(
- combined,
+ PCollectionViews.singletonViewUsingVoidKey(
+ materializationInput,
(TypeDescriptorSupplier<OutputT>)
() -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
input.getWindowingStrategy(),
insertDefault,
insertDefault ? fn.defaultValue() : null,
combined.getCoder());
- combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
+ materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index e81f0b8..904575c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,6 +29,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -257,16 +260,33 @@
* Long#MIN_VALUE} key is used to store all known {@link OffsetRange ranges} allowing us to
* compute such an ordering.
*/
+
+ // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+ if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+ && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+ || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+ Coder<T> inputCoder = input.getCoder();
+ PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
+ input
+ .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
+ .setCoder(
+ KvCoder.of(
+ BigEndianLongCoder.of(),
+ ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ materializationInput,
+ (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
+ input.getWindowingStrategy());
+ materializationInput.apply(CreatePCollectionView.of(view));
+ return view;
+ }
+
+ PCollection<KV<Void, T>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
- PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
- input
- .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
- .setCoder(
- KvCoder.of(
- BigEndianLongCoder.of(),
- ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
- PCollectionViews.listView(
+ PCollectionViews.listViewUsingVoidKey(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
@@ -280,8 +300,8 @@
* range for each window seen. We use random offset ranges to minimize the chance that two ranges
* overlap increasing the odds that each "key" represents a single index.
*/
- @Internal
- public static class ToListViewDoFn<T> extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
+ private static class ToListViewDoFn<T>
+ extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
private Map<BoundedWindow, OffsetRange> windowsToOffsets = new HashMap<>();
private OffsetRange generateRange(BoundedWindow window) {
@@ -330,19 +350,29 @@
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
+ // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+ if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+ && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+ || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+ Coder<T> inputCoder = input.getCoder();
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input,
+ (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
+ input.getWindowingStrategy());
+ input.apply(CreatePCollectionView.of(view));
+ return view;
+ }
+
+ PCollection<KV<Void, T>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<>());
Coder<T> inputCoder = input.getCoder();
- // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
- // There are bugs in "composite" vs "primitive" transform distinction
- // in TransformHierachy. This noop transform works around them and should be zero
- // cost.
- PCollection<T> materializationInput =
- input.apply(MapElements.via(new SimpleFunction<T, T>(x -> x) {}));
PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
+ PCollectionViews.iterableViewUsingVoidKey(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
- input.apply(CreatePCollectionView.of(view));
+ materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
}
@@ -478,22 +508,35 @@
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
+ // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+ if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+ && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+ || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+ KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
+ Coder<K> keyCoder = kvCoder.getKeyCoder();
+ Coder<V> valueCoder = kvCoder.getValueCoder();
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input,
+ (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
+ (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
+ input.getWindowingStrategy());
+ input.apply(CreatePCollectionView.of(view));
+ return view;
+ }
+
KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();
- // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
- // There are bugs in "composite" vs "primitive" transform distinction
- // in TransformHierachy. This noop transform works around them and should be zero
- // cost.
- PCollection<KV<K, V>> materializationInput =
- input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
+ PCollection<KV<Void, KV<K, V>>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<>());
PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
+ PCollectionViews.multimapViewUsingVoidKey(
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
- input.apply(CreatePCollectionView.of(view));
+ materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
}
@@ -524,19 +567,37 @@
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
+ // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+ if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+ && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+ || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+ KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
+ Coder<K> keyCoder = kvCoder.getKeyCoder();
+ Coder<V> valueCoder = kvCoder.getValueCoder();
+
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input,
+ (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
+ (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
+ input.getWindowingStrategy());
+ input.apply(CreatePCollectionView.of(view));
+ return view;
+ }
+
KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
Coder<K> keyCoder = kvCoder.getKeyCoder();
Coder<V> valueCoder = kvCoder.getValueCoder();
- PCollection<KV<K, V>> materializationInput =
- input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
+ PCollection<KV<Void, KV<K, V>>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<>());
PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
+ PCollectionViews.mapViewUsingVoidKey(
materializationInput,
(TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
(TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
- input.apply(CreatePCollectionView.of(view));
+ materializationInput.apply(CreatePCollectionView.of(view));
return view;
}
}
@@ -545,11 +606,34 @@
// Internal details below
/**
+ * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
+ *
+ * <p>TODO(BEAM-10097): Replace this materialization with specializations that optimize the
+ * various SDK requested views.
+ */
+ @Internal
+ public static class VoidKeyToMultimapMaterialization<T>
+ extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
+
+ private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
+ @ProcessElement
+ public void processElement(@Element T element, OutputReceiver<KV<Void, T>> r) {
+ r.output(KV.of((Void) null, element));
+ }
+ }
+
+ @Override
+ public PCollection<KV<Void, T>> expand(PCollection<T> input) {
+ PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
+ output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
+ return output;
+ }
+ }
+
+ /**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
- * <p>Placeholder transform for runners to have a hook to materialize a {@link PCollection} as a
- * side input. The metadata included in the {@link PCollectionView} is how the {@link PCollection}
- * will be read as a side input.
+ * <p>Creates a primitive {@link PCollectionView}.
*
* @param <ElemT> The type of the elements of the input PCollection
* @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 360c1af..df88e21 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -120,7 +120,6 @@
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<T> singletonViewUsingVoidKey(
- TupleTag<MultimapView<Void, T>> tag,
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy,
@@ -129,7 +128,6 @@
Coder<T> defaultValueCoder) {
return new SimplePCollectionView<>(
pCollection,
- tag,
new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder, typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -158,13 +156,11 @@
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableViewUsingVoidKey(
- TupleTag<MultimapView<Void, T>> tag,
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
- tag,
new IterableViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -188,35 +184,16 @@
/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
- */
- public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
- PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
- TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
- TypeDescriptorSupplier<T> typeDescriptorSupplier,
- WindowingStrategy<?, W> windowingStrategy) {
- return new SimplePCollectionView<>(
- pCollection,
- tag,
- new ListViewFn2<>(typeDescriptorSupplier),
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy);
- }
-
- /**
- * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
- * provided {@link WindowingStrategy}.
*
* @deprecated See {@link #listView}.
*/
@Deprecated
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewUsingVoidKey(
- TupleTag<MultimapView<Void, T>> tag,
PCollection<KV<Void, T>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
- tag,
new ListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -266,14 +243,12 @@
*/
@Deprecated
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapViewUsingVoidKey(
- TupleTag<MultimapView<Void, KV<K, V>>> tag,
PCollection<KV<Void, KV<K, V>>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
- tag,
new MapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -304,14 +279,12 @@
@Deprecated
public static <K, V, W extends BoundedWindow>
PCollectionView<Map<K, Iterable<V>>> multimapViewUsingVoidKey(
- TupleTag<MultimapView<Void, KV<K, V>>> tag,
PCollection<KV<Void, KV<K, V>>> pCollection,
TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
- tag,
new MultimapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
@@ -339,9 +312,7 @@
* <p>{@link SingletonViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- @Internal
- public static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T>
- implements HasDefaultValue<T> {
+ private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {
private byte @Nullable [] encodedDefaultValue;
private transient @Nullable T defaultValue;
private @Nullable Coder<T> valueCoder;
@@ -379,7 +350,6 @@
*
* @throws NoSuchElementException if no default was specified.
*/
- @Override
public T getDefaultValue() {
if (!hasDefault) {
throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -423,11 +393,6 @@
}
}
- @Internal
- public interface HasDefaultValue<T> {
- T getDefaultValue();
- }
-
/**
* Implementation which is able to adapt a multimap materialization to a {@code T}.
*
@@ -437,8 +402,7 @@
*/
@Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T>
- implements HasDefaultValue<T> {
+ public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T> {
private byte @Nullable [] encodedDefaultValue;
private transient @Nullable T defaultValue;
private @Nullable Coder<T> valueCoder;
@@ -476,7 +440,6 @@
*
* @throws NoSuchElementException if no default was specified.
*/
- @Override
public T getDefaultValue() {
if (!hasDefault) {
throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -530,8 +493,7 @@
* <p>{@link IterableViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- @Internal
- public static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
+ private static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
public IterableViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
@@ -597,7 +559,7 @@
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
@VisibleForTesting
- public static class ListViewFn2<T>
+ static class ListViewFn2<T>
extends ViewFn<MultimapView<Long, ValueOrMetadata<T, OffsetRange>>, List<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;
@@ -1041,8 +1003,7 @@
* <p>{@link MultimapViewFn} is meant to be removed in the future and replaced with this class.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- @Internal
- public static class MultimapViewFn2<K, V>
+ private static class MultimapViewFn2<K, V>
extends ViewFn<MultimapView<K, V>, Map<K, Iterable<V>>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
@@ -1130,8 +1091,7 @@
*
* <p>{@link MapViewFn} is meant to be removed in the future and replaced with this class.
*/
- @Internal
- public static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
+ private static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
@@ -1319,13 +1279,7 @@
@Override
public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("tag", tag)
- .add("viewFn", viewFn)
- .add("coder", coder)
- .add("windowMappingFn", windowMappingFn)
- .add("pCollection", pCollection)
- .toString();
+ return MoreObjects.toStringHelper(this).add("tag", tag).toString();
}
@Override
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index 61b4bf8..b340d91 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.testing;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -40,42 +42,82 @@
// materializations will differ but test code should not worry about what these look like if
// they are relying on the ViewFn to "undo" the conversion.
- if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
- }
- } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
- }
- } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
- if (values.length > 0) {
- rval.add(
- KV.of(
- Long.MIN_VALUE, ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
- for (int i = 0; i < values.length; ++i) {
- rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
+ // TODO(BEAM-10097): Make this the default case once all portable runners can support
+ // the iterable access pattern.
+ if (hasExperiment(options, "beam_fn_api")
+ && (hasExperiment(options, "use_runner_v2")
+ || hasExperiment(options, "use_unified_worker"))) {
+ if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
}
- }
- } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
- }
- } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
- for (Object value : values) {
- rval.add(value);
+ } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
+ }
+ } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+ if (values.length > 0) {
+ rval.add(
+ KV.of(
+ Long.MIN_VALUE,
+ ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
+ for (int i = 0; i < values.length; ++i) {
+ rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
+ }
+ }
+ } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
+ }
+ } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(value);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown type of view %s. Supported views are %s.",
+ viewTransformClass.getClass(),
+ ImmutableSet.of(
+ View.AsSingleton.class,
+ View.AsIterable.class,
+ View.AsList.class,
+ View.AsMap.class,
+ View.AsMultimap.class)));
}
} else {
- throw new IllegalArgumentException(
- String.format(
- "Unknown type of view %s. Supported views are %s.",
- viewTransformClass.getClass(),
- ImmutableSet.of(
- View.AsSingleton.class,
- View.AsIterable.class,
- View.AsList.class,
- View.AsMap.class,
- View.AsMultimap.class)));
+ if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown type of view %s. Supported views are %s.",
+ viewTransformClass.getClass(),
+ ImmutableSet.of(
+ View.AsSingleton.class,
+ View.AsIterable.class,
+ View.AsList.class,
+ View.AsMap.class,
+ View.AsMultimap.class)));
+ }
}
return Collections.unmodifiableList(rval);
}
diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index 68139cf..f6d3340 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -48,7 +48,7 @@
from apache_beam.utils.timestamp import Timestamp
try:
- from google.cloud import bigquery
+ from google.cloud import bigquery # type: ignore
from google.cloud.bigquery.schema import SchemaField
from google.cloud.exceptions import NotFound
except ImportError: