Merge pull request #14874 from emilymye/release-2.30.0

[BEAM-12337] Cherrypick #14814: Make default UW image for java beam_java8/11_sdk
diff --git a/release/src/main/scripts/set_version.sh b/release/src/main/scripts/set_version.sh
index 1789f8e..72af526 100755
--- a/release/src/main/scripts/set_version.sh
+++ b/release/src/main/scripts/set_version.sh
@@ -77,7 +77,8 @@
   sed -i -e "s/sdk_version=.*/sdk_version=$TARGET_VERSION/" gradle.properties
   sed -i -e "s/SdkVersion = .*/SdkVersion = \"$TARGET_VERSION\"/" sdks/go/pkg/beam/core/core.go
   # TODO: [BEAM-4767]
-  sed -i -e "s/'dataflow.container_version' : .*/'dataflow.container_version' : 'beam-${TARGET_VERSION}'/" runners/google-cloud-dataflow-java/build.gradle
+  sed -i -e "s/'dataflow.fnapi_container_version' : .*/'dataflow.fnapi_container_version' : 'beam-${TARGET_VERSION}'/" runners/google-cloud-dataflow-java/build.gradle
+  sed -i -e "s/'dataflow.legacy_container_version' : .*/'dataflow.legacy_container_version' : 'beam-${TARGET_VERSION}'/" runners/google-cloud-dataflow-java/build.gradle
 else
   # For snapshot version:
   #   Java/gradle appends -SNAPSHOT
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index 222818d..a9232e3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -90,20 +90,22 @@
           .build();
 
   public enum JavaVersion {
-    v8("java8", "1.8"),
-    v11("java11", "11");
+    java8("java", "1.8"),
+    java11("java11", "11");
 
-    private final String name;
+    // Legacy name, as used in container image
+    private final String legacyName;
+
+    // Specification version (e.g. System java.specification.version)
     private final String specification;
 
-    JavaVersion(final String name, final String specification) {
-      this.name = name;
+    JavaVersion(final String legacyName, final String specification) {
+      this.legacyName = legacyName;
       this.specification = specification;
     }
 
-    @Override
-    public String toString() {
-      return this.name;
+    public String legacyName() {
+      return this.legacyName;
     }
 
     public String specification() {
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
index 082aac6..d980d4a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
@@ -254,14 +254,14 @@
 
   @Test
   public void testJavaVersion() {
-    assertEquals(JavaVersion.v8, JavaVersion.forSpecification("1.8"));
-    assertEquals("java8", JavaVersion.v8.toString());
-    assertEquals(JavaVersion.v11, JavaVersion.forSpecification("11"));
-    assertEquals("java11", JavaVersion.v11.toString());
+    assertEquals(JavaVersion.java8, JavaVersion.forSpecification("1.8"));
+    assertEquals("java", JavaVersion.java8.legacyName());
+    assertEquals(JavaVersion.java11, JavaVersion.forSpecification("11"));
+    assertEquals("java11", JavaVersion.java11.legacyName());
   }
 
   @Test(expected = UnsupportedOperationException.class)
   public void testJavaVersionInvalid() {
-    assertEquals(JavaVersion.v8, JavaVersion.forSpecification("invalid"));
+    assertEquals(JavaVersion.java8, JavaVersion.forSpecification("invalid"));
   }
 }
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index 240318f..6175c30 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -45,7 +45,8 @@
   filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
     'dataflow.legacy_environment_major_version' : '8',
     'dataflow.fnapi_environment_major_version' : '8',
-    'dataflow.container_version' : 'beam-2.30.0',
+    'dataflow.legacy_container_version' : 'beam-2.30.0',
+    'dataflow.fnapi_container_version' : 'beam-2.30.0',
     'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3',
   ]
 }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 68363a8..fa7c1ec 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -148,6 +148,7 @@
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;
@@ -382,7 +383,7 @@
 
     // Adding the Java version to the SDK name for user's and support convenience.
     String agentJavaVer =
-        (Environments.getJavaVersion() == Environments.JavaVersion.v8)
+        (Environments.getJavaVersion() == Environments.JavaVersion.java8)
             ? "(JRE 8 environment)"
             : "(JDK 11 environment)";
 
@@ -2181,7 +2182,6 @@
       return getDefaultContainerImageUrl(options);
     } else if (containerImage.contains("IMAGE")) {
       // Replace placeholder with default image name
-      // TODO(emilymye): See if we can remove this placeholder
       return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options));
     } else {
       return containerImage;
@@ -2195,27 +2195,38 @@
         "%s/%s:%s",
         dataflowRunnerInfo.getContainerImageBaseRepository(),
         getDefaultContainerImageNameForJob(options),
-        dataflowRunnerInfo.getContainerVersion());
+        getDefaultContainerVersion(options));
   }
 
   /**
-   * Construct the default Dataflow container image name based on pipeline type and Environment Java
-   * version.
+   * Construct the default Dataflow container image name based on pipeline type and Java version.
    */
   static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) {
     Environments.JavaVersion javaVersion = Environments.getJavaVersion();
-    String legacyJavaVersionId =
-        (javaVersion == Environments.JavaVersion.v8) ? "java" : javaVersion.toString();
-
     if (useUnifiedWorker(options)) {
-      return "java";
+      return String.format("beam_%s_sdk", javaVersion.name());
     } else if (options.isStreaming()) {
-      return String.format("beam-%s-streaming", legacyJavaVersionId);
+      return String.format("beam-%s-streaming", javaVersion.legacyName());
     } else {
-      return String.format("beam-%s-batch", legacyJavaVersionId);
+      return String.format("beam-%s-batch", javaVersion.legacyName());
     }
   }
 
+  /**
+   * Construct the default Dataflow container image name based on pipeline type and Java version.
+   */
+  static String getDefaultContainerVersion(DataflowPipelineOptions options) {
+    DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
+    ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
+    if (releaseInfo.isDevSdkVersion()) {
+      if (useUnifiedWorker(options)) {
+        return dataflowRunnerInfo.getFnApiDevContainerVersion();
+      }
+      return dataflowRunnerInfo.getLegacyDevContainerVersion();
+    }
+    return releaseInfo.getSdkVersion();
+  }
+
   static boolean useUnifiedWorker(DataflowPipelineOptions options) {
     return hasExperiment(options, "beam_fn_api")
         || hasExperiment(options, "use_runner_v2")
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
index 4c5b272..fed503d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
@@ -41,7 +41,8 @@
       "fnapi.environment.major.version";
   private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY =
       "legacy.environment.major.version";
-  private static final String CONTAINER_VERSION_KEY = "container.version";
+  private static final String CONTAINER_FNAPI_VERSION_KEY = "fnapi.container.version";
+  private static final String CONTAINER_LEGACY_VERSION_KEY = "legacy.container.version";
   private static final String CONTAINER_BASE_REPOSITORY_KEY = "container.base_repository";
 
   private static class LazyInit {
@@ -100,10 +101,18 @@
     return properties.get(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY);
   }
 
-  /** Provides the version/tag for constructing the container image path. */
-  public String getContainerVersion() {
-    checkState(properties.containsKey(CONTAINER_VERSION_KEY), "Unknown container version");
-    return properties.get(CONTAINER_VERSION_KEY);
+  /** Provides the version/tag for dev SDK FnAPI container image. */
+  public String getFnApiDevContainerVersion() {
+    checkState(
+        properties.containsKey(CONTAINER_FNAPI_VERSION_KEY), "Unknown FnAPI container version");
+    return properties.get(CONTAINER_FNAPI_VERSION_KEY);
+  }
+
+  /** Provides the version/tag for legacy SDK FnAPI container image. */
+  public String getLegacyDevContainerVersion() {
+    checkState(
+        properties.containsKey(CONTAINER_LEGACY_VERSION_KEY), "Unknown legacy container version");
+    return properties.get(CONTAINER_LEGACY_VERSION_KEY);
   }
 
   /** Provides the version/tag for constructing the container image path. */
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 58e92ee..956c447 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,5 +18,6 @@
 
 legacy.environment.major.version=@dataflow.legacy_environment_major_version@
 fnapi.environment.major.version=@dataflow.fnapi_environment_major_version@
-container.version=@dataflow.container_version@
+legacy.container.version=@dataflow.legacy_container_version@
+fnapi.container.version=@dataflow.fnapi_container_version@
 container.base_repository=@dataflow.container_base_repository@
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
index fbb8ed2..e75af29 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
@@ -49,12 +49,16 @@
         String.format("FnAPI environment major version number %s is not a number", version),
         version.matches("\\d+"));
 
-    // Validate container version does not contain the property name (indicating it was not filled
-    // in).
+    // Validate container versions do not contain the property name.
     assertThat(
-        "container version invalid",
-        info.getContainerVersion(),
-        not(containsString("dataflow.container_version")));
+        "legacy container version invalid",
+        info.getFnApiDevContainerVersion(),
+        not(containsString("dataflow.legacy_container_version")));
+
+    assertThat(
+        "FnAPI container version invalid",
+        info.getLegacyDevContainerVersion(),
+        not(containsString("dataflow.fnapi_container_version")));
 
     // Validate container base repository does not contain the property name
     // (indicating it was not filled in).
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f828531..59d105f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1628,29 +1628,38 @@
   @Test
   public void testGetContainerImageForJobFromOptionWithPlaceholder() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    // When image option contains placeholder, container image should
-    // have placeholder replaced with default image for job.
     options.setSdkContainerImage("gcr.io/IMAGE/foo");
 
-    // batch, legacy
-    options.setExperiments(null);
-    options.setStreaming(false);
-    System.setProperty("java.specification.version", "1.8");
-    assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java-batch/foo"));
-    // batch, legacy, jdk11
-    options.setStreaming(false);
-    System.setProperty("java.specification.version", "11");
-    assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java11-batch/foo"));
-    // streaming, legacy
-    System.setProperty("java.specification.version", "1.8");
-    options.setStreaming(true);
-    assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java-streaming/foo"));
-    // streaming, legacy, jdk11
-    System.setProperty("java.specification.version", "11");
-    assertThat(getContainerImageForJob(options), equalTo("gcr.io/beam-java11-streaming/foo"));
-    // streaming, fnapi
-    options.setExperiments(ImmutableList.of("experiment1", "beam_fn_api"));
-    assertThat(getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
+    for (Environments.JavaVersion javaVersion : Environments.JavaVersion.values()) {
+      System.setProperty("java.specification.version", javaVersion.specification());
+      // batch legacy
+      options.setExperiments(null);
+      options.setStreaming(false);
+      assertThat(
+          getContainerImageForJob(options),
+          equalTo(String.format("gcr.io/beam-%s-batch/foo", javaVersion.legacyName())));
+
+      // streaming, legacy
+      options.setExperiments(null);
+      options.setStreaming(true);
+      assertThat(
+          getContainerImageForJob(options),
+          equalTo(String.format("gcr.io/beam-%s-streaming/foo", javaVersion.legacyName())));
+
+      // batch, FnAPI
+      options.setExperiments(ImmutableList.of("beam_fn_api"));
+      options.setStreaming(false);
+      assertThat(
+          getContainerImageForJob(options),
+          equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name())));
+
+      // streaming, FnAPI
+      options.setExperiments(ImmutableList.of("beam_fn_api"));
+      options.setStreaming(true);
+      assertThat(
+          getContainerImageForJob(options),
+          equalTo(String.format("gcr.io/beam_%s_sdk/foo", javaVersion.name())));
+    }
   }
 
   @Test
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
index c284d462..3f21570 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReleaseInfo.java
@@ -54,16 +54,21 @@
     return getProperties().get("name");
   }
 
-  /** Provides the BEAM version. ie: 2.18.0-SNAPSHOT */
+  /** Provides the BEAM version e.g. 2.18.0-SNAPSHOT */
   public String getVersion() {
     return getProperties().get("version");
   }
 
-  /** Provides the SDK version. ie: 2.18.0 or 2.18.0.dev */
+  /** Provides the SDK version. e.g. 2.18.0 or 2.18.0.dev */
   public String getSdkVersion() {
     return getProperties().get("sdk_version");
   }
 
+  /** Returns true if SDK version is a dev version (e.g. 2.18.0.dev) */
+  public boolean isDevSdkVersion() {
+    return getProperties().get("sdk_version").contains("dev");
+  }
+
   /** Provides docker image default root (apache). */
   public String getDefaultDockerRepoRoot() {
     return getProperties().get("docker_image_default_repo_root");