commit | 098218b85e54a810c34c685ac308fdf4c88bca41 | [log] [tgz] |
---|---|---|
author | Andrew Pilloud <apilloud@google.com> | Mon May 03 12:10:51 2021 -0700 |
committer | Andrew Pilloud <apilloud@google.com> | Thu Sep 02 12:37:48 2021 -0700 |
tree | ec930b44e1e8d0791775c782fa2697a813e2e052 | |
parent | b4e163cd857afbc375400089a4324359f2831112 [diff] |
Handle BeamRelNode in RelSubset
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java index b35d84c..9dd731f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
@@ -50,7 +50,16 @@ } else { return PCollectionList.of( inputRels.stream() - .map(input -> BeamSqlRelUtils.toPCollection(pipeline, (BeamRelNode) input, cache)) + .map( + input -> { + final BeamRelNode beamRel; + if (input instanceof RelSubset) { + beamRel = (BeamRelNode) ((RelSubset) input).getBest(); + } else { + beamRel = (BeamRelNode) input; + } + return BeamSqlRelUtils.toPCollection(pipeline, beamRel, cache); + }) .collect(Collectors.toList())); } }