Merge pull request #15469: [BEAM-12853] VALUES produces a UNION, window can't be set afterwards
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fa7c1ec..c1220b0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -109,6 +109,7 @@
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
@@ -849,6 +850,57 @@
}
}
+ @VisibleForTesting
+ protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
+ RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
+ RunnerApi.Components.Builder componentsBuilder = pipelineBuilder.getComponentsBuilder();
+ componentsBuilder.clearEnvironments();
+ for (Map.Entry<String, RunnerApi.Environment> entry :
+ pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+ RunnerApi.Environment.Builder environmentBuilder = entry.getValue().toBuilder();
+ environmentBuilder.clearDependencies();
+ for (RunnerApi.ArtifactInformation info : entry.getValue().getDependenciesList()) {
+ if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn())) {
+ throw new RuntimeException(
+ String.format("unsupported artifact type %s", info.getTypeUrn()));
+ }
+ RunnerApi.ArtifactFilePayload filePayload;
+ try {
+ filePayload = RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Error parsing artifact file payload.", e);
+ }
+ if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+ .equals(info.getRoleUrn())) {
+ throw new RuntimeException(
+ String.format("unsupported artifact role %s", info.getRoleUrn()));
+ }
+ RunnerApi.ArtifactStagingToRolePayload stagingPayload;
+ try {
+ stagingPayload = RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Error parsing artifact staging_to role payload.", e);
+ }
+ environmentBuilder.addDependencies(
+ info.toBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(
+ FileSystems.matchNewResource(options.getStagingLocation(), true)
+ .resolve(
+ stagingPayload.getStagedName(),
+ ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+ .toString())
+ .setSha256(filePayload.getSha256())
+ .build()
+ .toByteString()));
+ }
+ componentsBuilder.putEnvironments(entry.getKey(), environmentBuilder.build());
+ }
+ return pipelineBuilder.build();
+ }
+
private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
ImmutableList.Builder<StagedFile> filesToStageBuilder = ImmutableList.builder();
for (Map.Entry<String, RunnerApi.Environment> entry :
@@ -952,6 +1004,10 @@
RunnerApi.Pipeline portablePipelineProto =
PipelineTranslation.toProto(pipeline, portableComponents, false);
+ // Note that `stageArtifacts` has to be called before `resolveArtifact` because
+ // `resolveArtifact` updates local paths to staged paths in pipeline proto.
+ List<DataflowPackage> packages = stageArtifacts(portablePipelineProto);
+ portablePipelineProto = resolveArtifacts(portablePipelineProto);
LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
// Stage the portable pipeline proto, retrieving the staged pipeline path, then update
// the options on the new job
@@ -976,7 +1032,6 @@
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
LOG.debug("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
- List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);
// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 55aa182..f5bde8e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1179,6 +1179,97 @@
}
@Test
+ public void testResolveArtifacts() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ String stagingLocation = options.getStagingLocation().replaceFirst("/$", "");
+ RunnerApi.ArtifactInformation fooLocalArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath("/tmp/foo.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("foo_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.ArtifactInformation barLocalArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath("/tmp/bar.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("bar_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.Pipeline pipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putEnvironments(
+ "env",
+ RunnerApi.Environment.newBuilder()
+ .addAllDependencies(
+ ImmutableList.of(fooLocalArtifact, barLocalArtifact))
+ .build()))
+ .build();
+
+ RunnerApi.ArtifactInformation fooStagedArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(stagingLocation + "/foo_staged.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("foo_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.ArtifactInformation barStagedArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(stagingLocation + "/bar_staged.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("bar_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.Pipeline expectedPipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putEnvironments(
+ "env",
+ RunnerApi.Environment.newBuilder()
+ .addAllDependencies(
+ ImmutableList.of(fooStagedArtifact, barStagedArtifact))
+ .build()))
+ .build();
+ assertThat(runner.resolveArtifacts(pipeline), equalTo(expectedPipeline));
+ }
+
+ @Test
public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
index 5b3c83e..14d7422 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-import static junit.framework.TestCase.assertNull;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.METHOD_PROPERTY;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.WRITE_DISPOSITION_PROPERTY;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
@@ -42,7 +41,6 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -740,12 +738,10 @@
BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
- // Calc is not dropped because BigQuery does not support field reordering yet.
- assertThat(relNode, instanceOf(BeamCalcRel.class));
- assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class));
+ assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class));
// IO projects fields in the same order they are defined in the schema.
assertThat(
- relNode.getInput(0).getRowType().getFieldNames(),
+ relNode.getRowType().getFieldNames(),
containsInAnyOrder("c_tinyint", "c_integer", "c_varchar"));
// Field reordering is done in a Calc
assertThat(
@@ -816,15 +812,9 @@
BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
- assertThat(relNode, instanceOf(BeamCalcRel.class));
- // Predicate should be pushed-down to IO level
- assertNull(((BeamCalcRel) relNode).getProgram().getCondition());
-
- assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class));
+ assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class));
// Unused fields should not be projected by an IO
- assertThat(
- relNode.getInput(0).getRowType().getFieldNames(),
- containsInAnyOrder("c_varchar", "c_integer"));
+ assertThat(relNode.getRowType().getFieldNames(), containsInAnyOrder("c_varchar", "c_integer"));
assertThat(
output.getSchema(),
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
index f17a96e..817570a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
@@ -106,7 +106,7 @@
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
- String query = "SELECT key, boolColumn, intColumn, stringColumn, doubleColumn FROM flatTable";
+ String query = "SELECT key, boolColumn, longColumn, stringColumn, doubleColumn FROM flatTable";
sqlEnv.parseQuery(query);
PCollection<Row> queryOutput =
@@ -145,8 +145,8 @@
sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
String query =
- "INSERT INTO beamWriteTable(key, boolColumn, intColumn, stringColumn, doubleColumn) "
- + "VALUES ('key', TRUE, 10, 'stringValue', 5.5)";
+ "INSERT INTO beamWriteTable(key, boolColumn, longColumn, stringColumn, doubleColumn) "
+ + "VALUES ('key', TRUE, CAST(10 AS bigint), 'stringValue', 5.5)";
BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(query));
writePipeline.run().waitUntilFinish();
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
index 4d74a06..0a9ab2f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
@@ -100,7 +100,7 @@
String query =
String.format(
"INSERT INTO `%s`(key, boolColumn, longColumn, stringColumn, doubleColumn) "
- + "VALUES ('key1', FALSE, 1, 'string1', 1.0)",
+ + "VALUES ('key1', FALSE, CAST(1 as bigint), 'string1', 1.0)",
TABLE_ID);
BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
index 5cc358a..f318676 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
@@ -39,7 +39,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
import org.checkerframework.checker.nullness.qual.Nullable;
class BigtableTableTestUtils {
@@ -48,12 +48,12 @@
static final String KEY2 = "key2";
static final String BOOL_COLUMN = "boolColumn";
- static final String INT_COLUMN = "intColumn";
+ static final String LONG_COLUMN = "longColumn";
static final String STRING_COLUMN = "stringColumn";
static final String DOUBLE_COLUMN = "doubleColumn";
static final String FAMILY_TEST = "familyTest";
- static final Schema INT_COLUMN_SCHEMA =
+ static final Schema LONG_COLUMN_SCHEMA =
Schema.builder()
.addInt64Field(VALUE)
.addInt64Field(TIMESTAMP_MICROS)
@@ -63,7 +63,7 @@
static final Schema TEST_FAMILY_SCHEMA =
Schema.builder()
.addBooleanField(BOOL_COLUMN)
- .addRowField(INT_COLUMN, INT_COLUMN_SCHEMA)
+ .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
.addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
.addDoubleField(DOUBLE_COLUMN)
.build();
@@ -75,7 +75,7 @@
Schema.builder()
.addStringField(KEY)
.addBooleanField(BOOL_COLUMN)
- .addInt64Field(INT_COLUMN)
+ .addInt64Field(LONG_COLUMN)
.addStringField(STRING_COLUMN)
.addDoubleField(DOUBLE_COLUMN)
.build();
@@ -88,7 +88,7 @@
"CREATE EXTERNAL TABLE `%s`( \n"
+ " key VARCHAR NOT NULL, \n"
+ " boolColumn BOOLEAN NOT NULL, \n"
- + " intColumn BIGINT NOT NULL, \n"
+ + " longColumn BIGINT NOT NULL, \n"
+ " stringColumn VARCHAR NOT NULL, \n"
+ " doubleColumn DOUBLE NOT NULL \n"
+ ") \n"
@@ -105,7 +105,7 @@
+ " key VARCHAR NOT NULL, \n"
+ " familyTest ROW< \n"
+ " boolColumn BOOLEAN NOT NULL, \n"
- + " intColumn ROW< \n"
+ + " longColumn ROW< \n"
+ " val BIGINT NOT NULL, \n"
+ " timestampMicros BIGINT NOT NULL, \n"
+ " labels ARRAY<VARCHAR> NOT NULL \n"
@@ -123,7 +123,7 @@
return Schema.builder()
.addStringField(KEY)
.addBooleanField(BOOL_COLUMN)
- .addInt64Field("intValue")
+ .addInt64Field("longValue")
.addInt64Field(TIMESTAMP_MICROS)
.addArrayField(LABELS, Schema.FieldType.STRING)
.addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
@@ -155,7 +155,7 @@
}
static String columnsMappingString() {
- return "familyTest:boolColumn,familyTest:intColumn,familyTest:doubleColumn,"
+ return "familyTest:boolColumn,familyTest:longColumn,familyTest:doubleColumn,"
+ "familyTest:stringColumn";
}
@@ -170,7 +170,7 @@
ImmutableList.of(
column("boolColumn", booleanToByteArray(true)),
column("doubleColumn", doubleToByteArray(5.5)),
- column("intColumn", Ints.toByteArray(10)),
+ column("longColumn", Longs.toByteArray(10L)),
column("stringColumn", "stringValue".getBytes(UTF_8)));
Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
return com.google.bigtable.v2.Row.newBuilder()
@@ -226,8 +226,8 @@
clientWrapper.writeRow(key, table, FAMILY_TEST, STRING_COLUMN, "string1".getBytes(UTF_8), NOW);
clientWrapper.writeRow(
key, table, FAMILY_TEST, STRING_COLUMN, "string2".getBytes(UTF_8), LATER);
- clientWrapper.writeRow(key, table, FAMILY_TEST, INT_COLUMN, longToByteArray(1L), NOW);
- clientWrapper.writeRow(key, table, FAMILY_TEST, INT_COLUMN, longToByteArray(2L), LATER);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(1L), NOW);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(2L), LATER);
clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(1.10), NOW);
clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(2.20), LATER);
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
index a170ebf..4b60eb1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
@@ -96,9 +96,9 @@
String query =
"SELECT key, \n"
+ " bt.familyTest.boolColumn, \n"
- + " bt.familyTest.intColumn.val AS intValue, \n"
- + " bt.familyTest.intColumn.timestampMicros, \n"
- + " bt.familyTest.intColumn.labels, \n"
+ + " bt.familyTest.longColumn.val AS longValue, \n"
+ + " bt.familyTest.longColumn.timestampMicros, \n"
+ + " bt.familyTest.longColumn.labels, \n"
+ " bt.familyTest.stringColumn, \n"
+ " bt.familyTest.doubleColumn \n"
+ "FROM beamTable bt";