Merge pull request #15469: [BEAM-12853] VALUES produces a UNION, window can't be set afterwards

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fa7c1ec..c1220b0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -109,6 +109,7 @@
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
@@ -849,6 +850,57 @@
     }
   }
 
+  @VisibleForTesting
+  protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
+    RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
+    RunnerApi.Components.Builder componentsBuilder = pipelineBuilder.getComponentsBuilder();
+    componentsBuilder.clearEnvironments();
+    for (Map.Entry<String, RunnerApi.Environment> entry :
+        pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+      RunnerApi.Environment.Builder environmentBuilder = entry.getValue().toBuilder();
+      environmentBuilder.clearDependencies();
+      for (RunnerApi.ArtifactInformation info : entry.getValue().getDependenciesList()) {
+        if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn())) {
+          throw new RuntimeException(
+              String.format("unsupported artifact type %s", info.getTypeUrn()));
+        }
+        RunnerApi.ArtifactFilePayload filePayload;
+        try {
+          filePayload = RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact file payload.", e);
+        }
+        if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+            .equals(info.getRoleUrn())) {
+          throw new RuntimeException(
+              String.format("unsupported artifact role %s", info.getRoleUrn()));
+        }
+        RunnerApi.ArtifactStagingToRolePayload stagingPayload;
+        try {
+          stagingPayload = RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload());
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Error parsing artifact staging_to role payload.", e);
+        }
+        environmentBuilder.addDependencies(
+            info.toBuilder()
+                .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+                .setTypePayload(
+                    RunnerApi.ArtifactUrlPayload.newBuilder()
+                        .setUrl(
+                            FileSystems.matchNewResource(options.getStagingLocation(), true)
+                                .resolve(
+                                    stagingPayload.getStagedName(),
+                                    ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+                                .toString())
+                        .setSha256(filePayload.getSha256())
+                        .build()
+                        .toByteString()));
+      }
+      componentsBuilder.putEnvironments(entry.getKey(), environmentBuilder.build());
+    }
+    return pipelineBuilder.build();
+  }
+
   private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
     ImmutableList.Builder<StagedFile> filesToStageBuilder = ImmutableList.builder();
     for (Map.Entry<String, RunnerApi.Environment> entry :
@@ -952,6 +1004,10 @@
 
     RunnerApi.Pipeline portablePipelineProto =
         PipelineTranslation.toProto(pipeline, portableComponents, false);
+    // Note that `stageArtifacts` has to be called before `resolveArtifact` because
+    // `resolveArtifact` updates local paths to staged paths in pipeline proto.
+    List<DataflowPackage> packages = stageArtifacts(portablePipelineProto);
+    portablePipelineProto = resolveArtifacts(portablePipelineProto);
     LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
     // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
     // the options on the new job
@@ -976,7 +1032,6 @@
     RunnerApi.Pipeline dataflowV1PipelineProto =
         PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
     LOG.debug("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
-    List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);
 
     // Set a unique client_request_id in the CreateJob request.
     // This is used to ensure idempotence of job creation across retried
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 55aa182..f5bde8e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1179,6 +1179,97 @@
   }
 
   @Test
+  public void testResolveArtifacts() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    String stagingLocation = options.getStagingLocation().replaceFirst("/$", "");
+    RunnerApi.ArtifactInformation fooLocalArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+            .setTypePayload(
+                RunnerApi.ArtifactFilePayload.newBuilder()
+                    .setPath("/tmp/foo.jar")
+                    .build()
+                    .toByteString())
+            .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("foo_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.ArtifactInformation barLocalArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+            .setTypePayload(
+                RunnerApi.ArtifactFilePayload.newBuilder()
+                    .setPath("/tmp/bar.jar")
+                    .build()
+                    .toByteString())
+            .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("bar_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.Pipeline pipeline =
+        RunnerApi.Pipeline.newBuilder()
+            .setComponents(
+                RunnerApi.Components.newBuilder()
+                    .putEnvironments(
+                        "env",
+                        RunnerApi.Environment.newBuilder()
+                            .addAllDependencies(
+                                ImmutableList.of(fooLocalArtifact, barLocalArtifact))
+                            .build()))
+            .build();
+
+    RunnerApi.ArtifactInformation fooStagedArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+            .setTypePayload(
+                RunnerApi.ArtifactUrlPayload.newBuilder()
+                    .setUrl(stagingLocation + "/foo_staged.jar")
+                    .build()
+                    .toByteString())
+            .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("foo_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.ArtifactInformation barStagedArtifact =
+        RunnerApi.ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+            .setTypePayload(
+                RunnerApi.ArtifactUrlPayload.newBuilder()
+                    .setUrl(stagingLocation + "/bar_staged.jar")
+                    .build()
+                    .toByteString())
+            .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+            .setRolePayload(
+                RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                    .setStagedName("bar_staged.jar")
+                    .build()
+                    .toByteString())
+            .build();
+    RunnerApi.Pipeline expectedPipeline =
+        RunnerApi.Pipeline.newBuilder()
+            .setComponents(
+                RunnerApi.Components.newBuilder()
+                    .putEnvironments(
+                        "env",
+                        RunnerApi.Environment.newBuilder()
+                            .addAllDependencies(
+                                ImmutableList.of(fooStagedArtifact, barStagedArtifact))
+                            .build()))
+            .build();
+    assertThat(runner.resolveArtifacts(pipeline), equalTo(expectedPipeline));
+  }
+
+  @Test
   public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
index 5b3c83e..14d7422 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 
-import static junit.framework.TestCase.assertNull;
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.METHOD_PROPERTY;
 import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.WRITE_DISPOSITION_PROPERTY;
 import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
@@ -42,7 +41,6 @@
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -740,12 +738,10 @@
     BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
 
-    // Calc is not dropped because BigQuery does not support field reordering yet.
-    assertThat(relNode, instanceOf(BeamCalcRel.class));
-    assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class));
+    assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class));
     // IO projects fields in the same order they are defined in the schema.
     assertThat(
-        relNode.getInput(0).getRowType().getFieldNames(),
+        relNode.getRowType().getFieldNames(),
         containsInAnyOrder("c_tinyint", "c_integer", "c_varchar"));
     // Field reordering is done in a Calc
     assertThat(
@@ -816,15 +812,9 @@
     BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
 
-    assertThat(relNode, instanceOf(BeamCalcRel.class));
-    // Predicate should be pushed-down to IO level
-    assertNull(((BeamCalcRel) relNode).getProgram().getCondition());
-
-    assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class));
+    assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class));
     // Unused fields should not be projected by an IO
-    assertThat(
-        relNode.getInput(0).getRowType().getFieldNames(),
-        containsInAnyOrder("c_varchar", "c_integer"));
+    assertThat(relNode.getRowType().getFieldNames(), containsInAnyOrder("c_varchar", "c_integer"));
 
     assertThat(
         output.getSchema(),
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
index f17a96e..817570a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
@@ -106,7 +106,7 @@
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
     sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
 
-    String query = "SELECT key, boolColumn, intColumn, stringColumn, doubleColumn FROM flatTable";
+    String query = "SELECT key, boolColumn, longColumn, stringColumn, doubleColumn FROM flatTable";
 
     sqlEnv.parseQuery(query);
     PCollection<Row> queryOutput =
@@ -145,8 +145,8 @@
     sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
 
     String query =
-        "INSERT INTO beamWriteTable(key, boolColumn, intColumn, stringColumn, doubleColumn) "
-            + "VALUES ('key', TRUE, 10, 'stringValue', 5.5)";
+        "INSERT INTO beamWriteTable(key, boolColumn, longColumn, stringColumn, doubleColumn) "
+            + "VALUES ('key', TRUE, CAST(10 AS bigint), 'stringValue', 5.5)";
 
     BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(query));
     writePipeline.run().waitUntilFinish();
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
index 4d74a06..0a9ab2f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
@@ -100,7 +100,7 @@
     String query =
         String.format(
             "INSERT INTO `%s`(key, boolColumn, longColumn, stringColumn, doubleColumn) "
-                + "VALUES ('key1', FALSE, 1, 'string1', 1.0)",
+                + "VALUES ('key1', FALSE, CAST(1 as bigint), 'string1', 1.0)",
             TABLE_ID);
 
     BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
index 5cc358a..f318676 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
@@ -39,7 +39,7 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 class BigtableTableTestUtils {
@@ -48,12 +48,12 @@
   static final String KEY2 = "key2";
 
   static final String BOOL_COLUMN = "boolColumn";
-  static final String INT_COLUMN = "intColumn";
+  static final String LONG_COLUMN = "longColumn";
   static final String STRING_COLUMN = "stringColumn";
   static final String DOUBLE_COLUMN = "doubleColumn";
   static final String FAMILY_TEST = "familyTest";
 
-  static final Schema INT_COLUMN_SCHEMA =
+  static final Schema LONG_COLUMN_SCHEMA =
       Schema.builder()
           .addInt64Field(VALUE)
           .addInt64Field(TIMESTAMP_MICROS)
@@ -63,7 +63,7 @@
   static final Schema TEST_FAMILY_SCHEMA =
       Schema.builder()
           .addBooleanField(BOOL_COLUMN)
-          .addRowField(INT_COLUMN, INT_COLUMN_SCHEMA)
+          .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
           .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
           .addDoubleField(DOUBLE_COLUMN)
           .build();
@@ -75,7 +75,7 @@
       Schema.builder()
           .addStringField(KEY)
           .addBooleanField(BOOL_COLUMN)
-          .addInt64Field(INT_COLUMN)
+          .addInt64Field(LONG_COLUMN)
           .addStringField(STRING_COLUMN)
           .addDoubleField(DOUBLE_COLUMN)
           .build();
@@ -88,7 +88,7 @@
         "CREATE EXTERNAL TABLE `%s`( \n"
             + "  key VARCHAR NOT NULL, \n"
             + "  boolColumn BOOLEAN NOT NULL, \n"
-            + "  intColumn BIGINT NOT NULL, \n"
+            + "  longColumn BIGINT NOT NULL, \n"
             + "  stringColumn VARCHAR NOT NULL, \n"
             + "  doubleColumn DOUBLE NOT NULL \n"
             + ") \n"
@@ -105,7 +105,7 @@
             + "  key VARCHAR NOT NULL, \n"
             + "  familyTest ROW< \n"
             + "    boolColumn BOOLEAN NOT NULL, \n"
-            + "    intColumn ROW< \n"
+            + "    longColumn ROW< \n"
             + "      val BIGINT NOT NULL, \n"
             + "      timestampMicros BIGINT NOT NULL, \n"
             + "      labels ARRAY<VARCHAR> NOT NULL \n"
@@ -123,7 +123,7 @@
     return Schema.builder()
         .addStringField(KEY)
         .addBooleanField(BOOL_COLUMN)
-        .addInt64Field("intValue")
+        .addInt64Field("longValue")
         .addInt64Field(TIMESTAMP_MICROS)
         .addArrayField(LABELS, Schema.FieldType.STRING)
         .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
@@ -155,7 +155,7 @@
   }
 
   static String columnsMappingString() {
-    return "familyTest:boolColumn,familyTest:intColumn,familyTest:doubleColumn,"
+    return "familyTest:boolColumn,familyTest:longColumn,familyTest:doubleColumn,"
         + "familyTest:stringColumn";
   }
 
@@ -170,7 +170,7 @@
         ImmutableList.of(
             column("boolColumn", booleanToByteArray(true)),
             column("doubleColumn", doubleToByteArray(5.5)),
-            column("intColumn", Ints.toByteArray(10)),
+            column("longColumn", Longs.toByteArray(10L)),
             column("stringColumn", "stringValue".getBytes(UTF_8)));
     Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
     return com.google.bigtable.v2.Row.newBuilder()
@@ -226,8 +226,8 @@
     clientWrapper.writeRow(key, table, FAMILY_TEST, STRING_COLUMN, "string1".getBytes(UTF_8), NOW);
     clientWrapper.writeRow(
         key, table, FAMILY_TEST, STRING_COLUMN, "string2".getBytes(UTF_8), LATER);
-    clientWrapper.writeRow(key, table, FAMILY_TEST, INT_COLUMN, longToByteArray(1L), NOW);
-    clientWrapper.writeRow(key, table, FAMILY_TEST, INT_COLUMN, longToByteArray(2L), LATER);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(1L), NOW);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(2L), LATER);
     clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(1.10), NOW);
     clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(2.20), LATER);
   }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
index a170ebf..4b60eb1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
@@ -96,9 +96,9 @@
     String query =
         "SELECT key, \n"
             + "  bt.familyTest.boolColumn, \n"
-            + "  bt.familyTest.intColumn.val AS intValue, \n"
-            + "  bt.familyTest.intColumn.timestampMicros, \n"
-            + "  bt.familyTest.intColumn.labels, \n"
+            + "  bt.familyTest.longColumn.val AS longValue, \n"
+            + "  bt.familyTest.longColumn.timestampMicros, \n"
+            + "  bt.familyTest.longColumn.labels, \n"
             + "  bt.familyTest.stringColumn, \n"
             + "  bt.familyTest.doubleColumn \n"
             + "FROM beamTable bt";