Merge pull request #15468: [BEAM-12852] Revert change to int, use bigint in BigTable test
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fa7c1ec..c1220b0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -109,6 +109,7 @@
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
@@ -849,6 +850,57 @@
}
}
+ @VisibleForTesting
+ protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
+ RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
+ RunnerApi.Components.Builder componentsBuilder = pipelineBuilder.getComponentsBuilder();
+ componentsBuilder.clearEnvironments();
+ for (Map.Entry<String, RunnerApi.Environment> entry :
+ pipeline.getComponents().getEnvironmentsMap().entrySet()) {
+ RunnerApi.Environment.Builder environmentBuilder = entry.getValue().toBuilder();
+ environmentBuilder.clearDependencies();
+ for (RunnerApi.ArtifactInformation info : entry.getValue().getDependenciesList()) {
+ if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE).equals(info.getTypeUrn())) {
+ throw new RuntimeException(
+ String.format("unsupported artifact type %s", info.getTypeUrn()));
+ }
+ RunnerApi.ArtifactFilePayload filePayload;
+ try {
+ filePayload = RunnerApi.ArtifactFilePayload.parseFrom(info.getTypePayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Error parsing artifact file payload.", e);
+ }
+ if (!BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO)
+ .equals(info.getRoleUrn())) {
+ throw new RuntimeException(
+ String.format("unsupported artifact role %s", info.getRoleUrn()));
+ }
+ RunnerApi.ArtifactStagingToRolePayload stagingPayload;
+ try {
+ stagingPayload = RunnerApi.ArtifactStagingToRolePayload.parseFrom(info.getRolePayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Error parsing artifact staging_to role payload.", e);
+ }
+ environmentBuilder.addDependencies(
+ info.toBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(
+ FileSystems.matchNewResource(options.getStagingLocation(), true)
+ .resolve(
+ stagingPayload.getStagedName(),
+ ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+ .toString())
+ .setSha256(filePayload.getSha256())
+ .build()
+ .toByteString()));
+ }
+ componentsBuilder.putEnvironments(entry.getKey(), environmentBuilder.build());
+ }
+ return pipelineBuilder.build();
+ }
+
private List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
ImmutableList.Builder<StagedFile> filesToStageBuilder = ImmutableList.builder();
for (Map.Entry<String, RunnerApi.Environment> entry :
@@ -952,6 +1004,10 @@
RunnerApi.Pipeline portablePipelineProto =
PipelineTranslation.toProto(pipeline, portableComponents, false);
+ // Note that `stageArtifacts` has to be called before `resolveArtifact` because
+ // `resolveArtifact` updates local paths to staged paths in pipeline proto.
+ List<DataflowPackage> packages = stageArtifacts(portablePipelineProto);
+ portablePipelineProto = resolveArtifacts(portablePipelineProto);
LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
// Stage the portable pipeline proto, retrieving the staged pipeline path, then update
// the options on the new job
@@ -976,7 +1032,6 @@
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
LOG.debug("Dataflow v1 pipeline proto:\n{}", TextFormat.printToString(dataflowV1PipelineProto));
- List<DataflowPackage> packages = stageArtifacts(dataflowV1PipelineProto);
// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 55aa182..f5bde8e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1179,6 +1179,97 @@
}
@Test
+ public void testResolveArtifacts() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ String stagingLocation = options.getStagingLocation().replaceFirst("/$", "");
+ RunnerApi.ArtifactInformation fooLocalArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath("/tmp/foo.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("foo_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.ArtifactInformation barLocalArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.FILE))
+ .setTypePayload(
+ RunnerApi.ArtifactFilePayload.newBuilder()
+ .setPath("/tmp/bar.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("bar_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.Pipeline pipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putEnvironments(
+ "env",
+ RunnerApi.Environment.newBuilder()
+ .addAllDependencies(
+ ImmutableList.of(fooLocalArtifact, barLocalArtifact))
+ .build()))
+ .build();
+
+ RunnerApi.ArtifactInformation fooStagedArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(stagingLocation + "/foo_staged.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("foo_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.ArtifactInformation barStagedArtifact =
+ RunnerApi.ArtifactInformation.newBuilder()
+ .setTypeUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Types.URL))
+ .setTypePayload(
+ RunnerApi.ArtifactUrlPayload.newBuilder()
+ .setUrl(stagingLocation + "/bar_staged.jar")
+ .build()
+ .toByteString())
+ .setRoleUrn(BeamUrns.getUrn(RunnerApi.StandardArtifacts.Roles.STAGING_TO))
+ .setRolePayload(
+ RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+ .setStagedName("bar_staged.jar")
+ .build()
+ .toByteString())
+ .build();
+ RunnerApi.Pipeline expectedPipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putEnvironments(
+ "env",
+ RunnerApi.Environment.newBuilder()
+ .addAllDependencies(
+ ImmutableList.of(fooStagedArtifact, barStagedArtifact))
+ .build()))
+ .build();
+ assertThat(runner.resolveArtifacts(pipeline), equalTo(expectedPipeline));
+ }
+
+ @Test
public void testGcpTempAndNoTempLocationSucceeds() throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
index 5b3c83e..14d7422 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-import static junit.framework.TestCase.assertNull;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.METHOD_PROPERTY;
import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.WRITE_DISPOSITION_PROPERTY;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
@@ -42,7 +41,6 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -740,12 +738,10 @@
BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
- // Calc is not dropped because BigQuery does not support field reordering yet.
- assertThat(relNode, instanceOf(BeamCalcRel.class));
- assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class));
+ assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class));
// IO projects fields in the same order they are defined in the schema.
assertThat(
- relNode.getInput(0).getRowType().getFieldNames(),
+ relNode.getRowType().getFieldNames(),
containsInAnyOrder("c_tinyint", "c_integer", "c_varchar"));
// Field reordering is done in a Calc
assertThat(
@@ -816,15 +812,9 @@
BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
- assertThat(relNode, instanceOf(BeamCalcRel.class));
- // Predicate should be pushed-down to IO level
- assertNull(((BeamCalcRel) relNode).getProgram().getCondition());
-
- assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class));
+ assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class));
// Unused fields should not be projected by an IO
- assertThat(
- relNode.getInput(0).getRowType().getFieldNames(),
- containsInAnyOrder("c_varchar", "c_integer"));
+ assertThat(relNode.getRowType().getFieldNames(), containsInAnyOrder("c_varchar", "c_integer"));
assertThat(
output.getSchema(),