Merge pull request #14635: Inline some methods and variables for clarity.
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
index aae35ab..e32edf7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
@@ -26,7 +26,6 @@
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.PValues;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -41,17 +40,16 @@
private ReplacementOutputs() {}
public static Map<PCollection<?>, ReplacementOutput> singleton(
- Map<TupleTag<?>, PCollection<?>> original, PValue replacement) {
+ Map<TupleTag<?>, PCollection<?>> original, POutput replacement) {
Entry<TupleTag<?>, PCollection<?>> originalElement =
Iterables.getOnlyElement(original.entrySet());
- TupleTag<?> replacementTag = Iterables.getOnlyElement(replacement.expand().entrySet()).getKey();
- PCollection<?> replacementCollection =
- (PCollection<?>) Iterables.getOnlyElement(replacement.expand().entrySet()).getValue();
+ Entry<TupleTag<?>, PCollection<?>> replacementElement =
+ Iterables.getOnlyElement(PValues.expandOutput(replacement).entrySet());
return Collections.singletonMap(
- replacementCollection,
+ replacementElement.getValue(),
ReplacementOutput.of(
TaggedPValue.of(originalElement.getKey(), originalElement.getValue()),
- TaggedPValue.of(replacementTag, replacementCollection)));
+ TaggedPValue.of(replacementElement.getKey(), replacementElement.getValue())));
}
public static Map<PCollection<?>, ReplacementOutput> tagged(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 92c3e05..b04266c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -411,12 +411,12 @@
checkState(
this.outputs == null, "Tried to specify more than one output for %s", getFullName());
checkNotNull(output, "Tried to set the output of %s to null", getFullName());
- this.outputs = PValues.fullyExpand(output.expand());
+ this.outputs = PValues.expandOutput(output);
// Validate that a primitive transform produces only primitive output, and a composite
// transform does not produce primitive output.
Set<Node> outputProducers = new HashSet<>();
- for (PCollection<?> outputValue : PValues.fullyExpand(output.expand()).values()) {
+ for (PCollection<?> outputValue : PValues.expandOutput(output).values()) {
outputProducers.add(getProducer(outputValue));
}
if (outputProducers.contains(this) && (!parts.isEmpty() || outputProducers.size() > 1)) {