Merge pull request #14874 from emilymye/release-2.30.0
[BEAM-12337] Cherrypick #14814: Make default UW image for java beam_java8/11_sdk
diff --git a/release/src/main/scripts/set_version.sh b/release/src/main/scripts/set_version.sh
index 1789f8e..72af526 100755
--- a/release/src/main/scripts/set_version.sh
+++ b/release/src/main/scripts/set_version.sh
@@ -77,7 +77,8 @@
sed -i -e "s/sdk_version=.*/sdk_version=$TARGET_VERSION/" gradle.properties
sed -i -e "s/SdkVersion = .*/SdkVersion = \"$TARGET_VERSION\"/" sdks/go/pkg/beam/core/core.go
# TODO: [BEAM-4767]
- sed -i -e "s/'dataflow.container_version' : .*/'dataflow.container_version' : 'beam-${TARGET_VERSION}'/" runners/google-cloud-dataflow-java/build.gradle
+ sed -i -e "s/'dataflow.fnapi_container_version' : .*/'dataflow.fnapi_container_version' : 'beam-${TARGET_VERSION}'/" runners/google-cloud-dataflow-java/build.gradle
+ sed -i -e "s/'dataflow.legacy_container_version' : .*/'dataflow.legacy_container_version' : 'beam-${TARGET_VERSION}'/" runners/google-cloud-dataflow-java/build.gradle
else
# For snapshot version:
# Java/gradle appends -SNAPSHOT
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index 222818d..a9232e3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -90,20 +90,22 @@
.build();
public enum JavaVersion {
- v8("java8", "1.8"),
- v11("java11", "11");
+ java8("java", "1.8"),
+ java11("java11", "11");
- private final String name;
+ // Legacy name, as used in container image
+ private final String legacyName;
+
+ // Specification version (e.g. System java.specification.version)
private final String specification;
- JavaVersion(final String name, final String specification) {
- this.name = name;
+ JavaVersion(final String legacyName, final String specification) {
+ this.legacyName = legacyName;
this.specification = specification;
}
- @Override
- public String toString() {
- return this.name;
+ public String legacyName() {
+ return this.legacyName;
}
public String specification() {
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
index 082aac6..d980d4a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
@@ -254,14 +254,14 @@
@Test
public void testJavaVersion() {
- assertEquals(JavaVersion.v8, JavaVersion.forSpecification("1.8"));
- assertEquals("java8", JavaVersion.v8.toString());
- assertEquals(JavaVersion.v11, JavaVersion.forSpecification("11"));
- assertEquals("java11", JavaVersion.v11.toString());
+ assertEquals(JavaVersion.java8, JavaVersion.forSpecification("1.8"));
+ assertEquals("java", JavaVersion.java8.legacyName());
+ assertEquals(JavaVersion.java11, JavaVersion.forSpecification("11"));
+ assertEquals("java11", JavaVersion.java11.legacyName());
}
@Test(expected = UnsupportedOperationException.class)
public void testJavaVersionInvalid() {
- assertEquals(JavaVersion.v8, JavaVersion.forSpecification("invalid"));
+ assertEquals(JavaVersion.java8, JavaVersion.forSpecification("invalid"));
}
}
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index 240318f..6175c30 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -45,7 +45,8 @@
filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
'dataflow.legacy_environment_major_version' : '8',
'dataflow.fnapi_environment_major_version' : '8',
- 'dataflow.container_version' : 'beam-2.30.0',
+ 'dataflow.legacy_container_version' : 'beam-2.30.0',
+ 'dataflow.fnapi_container_version' : 'beam-2.30.0',
'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3',
]
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 68363a8..fa7c1ec 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -148,6 +148,7 @@
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
@@ -382,7 +383,7 @@
// Adding the Java version to the SDK name for user's and support convenience.
String agentJavaVer =
- (Environments.getJavaVersion() == Environments.JavaVersion.v8)
+ (Environments.getJavaVersion() == Environments.JavaVersion.java8)
? "(JRE 8 environment)"
: "(JDK 11 environment)";
@@ -2181,7 +2182,6 @@
return getDefaultContainerImageUrl(options);
} else if (containerImage.contains("IMAGE")) {
// Replace placeholder with default image name
- // TODO(emilymye): See if we can remove this placeholder
return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options));
} else {
return containerImage;
@@ -2195,27 +2195,38 @@
"%s/%s:%s",
dataflowRunnerInfo.getContainerImageBaseRepository(),
getDefaultContainerImageNameForJob(options),
- dataflowRunnerInfo.getContainerVersion());
+ getDefaultContainerVersion(options));
}
/**
- * Construct the default Dataflow container image name based on pipeline type and Environment Java
- * version.
+ * Construct the default Dataflow container image name based on pipeline type and Java version.
*/
static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) {
Environments.JavaVersion javaVersion = Environments.getJavaVersion();
- String legacyJavaVersionId =
- (javaVersion == Environments.JavaVersion.v8) ? "java" : javaVersion.toString();
-
if (useUnifiedWorker(options)) {
- return "java";
+ return String.format("beam_%s_sdk", javaVersion.name());
} else if (options.isStreaming()) {
- return String.format("beam-%s-streaming", legacyJavaVersionId);
+ return String.format("beam-%s-streaming", javaVersion.legacyName());
} else {
- return String.format("beam-%s-batch", legacyJavaVersionId);
+ return String.format("beam-%s-batch", javaVersion.legacyName());
}
}
+ /**
+ * Construct the default Dataflow container image name based on pipeline type and Java version.
+ */
+ static String getDefaultContainerVersion(DataflowPipelineOptions options) {
+ DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
+ ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
+ if (releaseInfo.isDevSdkVersion()) {
+ if (useUnifiedWorker(options)) {
+ return dataflowRunnerInfo.getFnApiDevContainerVersion();
+ }
+ return dataflowRunnerInfo.getLegacyDevContainerVersion();
+ }
+ return releaseInfo.getSdkVersion();
+ }
+
static boolean useUnifiedWorker(DataflowPipelineOptions options) {
return hasExperiment(options, "beam_fn_api")
|| hasExperiment(options, "use_runner_v2")
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
index 4c5b272..fed503d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
@@ -41,7 +41,8 @@
"fnapi.environment.major.version";
private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY =
"legacy.environment.major.version";
- private static final String CONTAINER_VERSION_KEY = "container.version";
+ private static final String CONTAINER_FNAPI_VERSION_KEY = "fnapi.container.version";
+ private static final String CONTAINER_LEGACY_VERSION_KEY = "legacy.container.version";
private static final String CONTAINER_BASE_REPOSITORY_KEY = "container.base_repository";
private static class LazyInit {
@@ -100,10 +101,18 @@
return properties.get(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY);
}
- /** Provides the version/tag for constructing the container image path. */
- public String getContainerVersion() {
- checkState(properties.containsKey(CONTAINER_VERSION_KEY), "Unknown container version");
- return properties.get(CONTAINER_VERSION_KEY);
+ /** Provides the version/tag for dev SDK FnAPI container image. */
+ public String getFnApiDevContainerVersion() {
+ checkState(
+ properties.containsKey(CONTAINER_FNAPI_VERSION_KEY), "Unknown FnAPI container version");
+ return properties.get(CONTAINER_FNAPI_VERSION_KEY);
+ }
+
+ /** Provides the version/tag for legacy SDK FnAPI container image. */
+ public String getLegacyDevContainerVersion() {
+ checkState(
+ properties.containsKey(CONTAINER_LEGACY_VERSION_KEY), "Unknown legacy container version");
+ return properties.get(CONTAINER_LEGACY_VERSION_KEY);
}
/** Provides the version/tag for constructing the container image path. */
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 58e92ee..956c447 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,5 +18,6 @@
legacy.environment.major.version=@dataflow.legacy_environment_major_version@
fnapi.environment.major.version=@dataflow.fnapi_environment_major_version@
-container.version=@dataflow.container_version@
+legacy.container.version=@dataflow.legacy_container_version@
+fnapi.container.version=@dataflow.fnapi_container_version@
container.base_repository=@dataflow.container_base_repository@
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
index fbb8ed2..e75af29 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
@@ -49,12 +49,16 @@
String.format("FnAPI environment major version number %s is not a number", version),
version.matches("\\d+"));
- // Validate container version does not contain the property name (indicating it was not filled
- // in).
+ // Validate container versions do not contain the property name.
assertThat(
- "container version invalid",
- info.getContainerVersion(),
- not(containsString("dataflow.container_version")));
+ "legacy container version invalid",
+ info.getFnApiDevContainerVersion(),
+ not(containsString("dataflow.legacy_container_version")));
+
+ assertThat(
+ "FnAPI container version invalid",
+ info.getLegacyDevContainerVersion(),
+ not(containsString("dataflow.fnapi_container_version")));
// Validate container base repository does not contain the property name
// (indicating it was not filled in).
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f828531..59d105f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1628,29 +1628,38 @@
@Test
public void testGetContainerImageForJobFromOptionWithPlaceholder() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- // When image option contains placeholder, container image should
- // have placeholder replaced with default image for job.
options.setSdkContainerImage("gcr.io/IMAGE/foo");
- // batch, legacy
- options.setExperiments(null);
- options.setStreaming(false);
- System.setProperty("java.specification.version", "1.8");
- assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java-batch/foo"));
- // batch, legacy, jdk11
- options.setStreaming(false);
- System.setProperty("java.specification.version", "11");
- assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java11-batch/foo"));
- // streaming, legacy
- System.setProperty("java.specification.version", "1.8");
- options.setStreaming(true);
- assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java-streaming/foo"));
- // streaming, legacy, jdk11
- System.setProperty("java.specification.version", "11");
- assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java11-streaming/foo"));
- // streaming, fnapi
- options.setExperiments(ImmutableList.of("experiment1", "beam_fn_api"));
- assertThat(getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
+ for (Environments.JavaVersion javaVersion : Environments.JavaVersion.values()) {
+ System.setProperty("java.specification.version", javaVersion.specification());
+ // batch legacy
+ options.setExperiments(null);
+ options.setStreaming(false);
+ assertThat(
+ getContainerImageForJob(options),
+ equalTo(String.format("gcr.io/beam-%s-batch/foo", javaVersion.legacyName())));
+
+ // streaming, legacy
+ options.setExperiments(null);
+ options.setStreaming(true);
+ assertThat(
+ getContainerImageForJob(options),
+ equalTo(String.format("gcr.io/beam-%s-streaming/foo", javaVersion.legacyName())));
+
+ // batch, FnAPI
+ options.setExperiments(ImmutableList.of("beam_fn_api"));
+ options.setStreaming(false);
+ assertThat(
+ getContainerImageForJob(options),
+ equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name())));
+
+ // streaming, FnAPI
+ options.setExperiments(ImmutableList.of("beam_fn_api"));
+ options.setStreaming(true);
+ assertThat(
+ getContainerImageForJob(options),
+ equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name())));
+ }
}
@Test
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
index c284d462..3f21570 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
@@ -54,16 +54,21 @@
return getProperties().get("name");
}
- /** Provides the BEAM version. ie: 2.18.0-SNAPSHOT */
+ /** Provides the BEAM version e.g. 2.18.0-SNAPSHOT */
public String getVersion() {
return getProperties().get("version");
}
- /** Provides the SDK version. ie: 2.18.0 or 2.18.0.dev */
+ /** Provides the SDK version. e.g. 2.18.0 or 2.18.0.dev */
public String getSdkVersion() {
return getProperties().get("sdk_version");
}
+ /** Returns true if SDK version is a dev version (e.g. 2.18.0.dev) */
+ public boolean isDevSdkVersion() {
+ return getProperties().get("sdk_version").contains("dev");
+ }
+
/** Provides docker image default root (apache). */
public String getDefaultDockerRepoRoot() {
return getProperties().get("docker_image_default_repo_root");