Merge pull request #15467: [BEAM-12851] Map output table names
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index f672621..ffe707f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -24,7 +24,10 @@
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.RenameFields;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -132,7 +135,8 @@
"Wrong number of inputs for %s: %s",
BeamIOSinkRel.class.getSimpleName(),
pinput);
- PCollection<Row> input = pinput.get(0);
+ Schema schema = CalciteUtils.toSchema(getExpectedInputRowType(0));
+ PCollection<Row> input = pinput.get(0).apply(RenameFields.<Row>create()).setRowSchema(schema);
sqlTable.buildIOWriter(input);
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java
index b45986c..16d6c11 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java
@@ -57,7 +57,7 @@
+ "event_timestamp TIMESTAMP, \n"
+ "attributes MAP<VARCHAR, VARCHAR>, \n"
+ "payload ROW< \n"
- + " id INTEGER, \n"
+ + " id BIGINT, \n"
+ " name VARCHAR \n"
+ " > \n"
+ ") \n"
@@ -111,7 +111,7 @@
String pubsubTableString =
"CREATE EXTERNAL TABLE pubsub_topic (\n"
+ "event_timestamp TIMESTAMP, \n"
- + "id INTEGER, \n"
+ + "id BIGINT, \n"
+ "name VARCHAR \n"
+ ") \n"
+ "TYPE 'pubsub' \n"
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index 58b0fc7..c8db3ba 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -47,6 +47,7 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexBuilder;
@@ -170,7 +171,7 @@
RelOptTableImpl.create(null, type, ImmutableList.of(), null),
null,
new BeamValuesRel(cluster, type, tuples, null),
- null,
+ Operation.INSERT,
null,
null,
false,