[Java] Disable soft delete policy when creating a default bucket for a project. (#31324)

* Disable soft delete policy when creating a default bucket for a project.

Also, getBucket() and removeBucket() are added into GcsUtil.

* Add random number to prefix to avoid collision on multiple test instances.

* Remove default bucket before creating in the integration test.

* Add one line in CHANGES.md to mention this change and fix a typo.
diff --git a/CHANGES.md b/CHANGES.md
index dd66e91..a6c61da 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -72,6 +72,7 @@
   Template variables can be passed with the (json-formatted) `--jinja_variables` flag.
 * DataFrame API now supports pandas 2.1.x and adds 12 more string functions for Series.([#31185](https://github.com/apache/beam/pull/31185)).
 * Added BigQuery handler for enrichment transform (Python) ([#31295](https://github.com/apache/beam/pull/31295))
+* Disable soft delete policy when creating the default bucket for a project (Java) ([#31324](https://github.com/apache/beam/pull/31324)).
 
 ## Breaking Changes
 
diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle
index d4dfd46..4af856c 100644
--- a/sdks/java/extensions/google-cloud-platform-core/build.gradle
+++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle
@@ -87,8 +87,32 @@
   }
 }
 
+// Note that no runner is specified here, so tests running under this task should not be running
+// pipelines.
+task integrationTestNoKms(type: Test) {
+  group = "Verification"
+  def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+  def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests-cmek'
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+          "--project=${gcpProject}",
+          "--tempRoot=${gcpTempRoot}",
+  ])
+
+  // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
+  outputs.upToDateWhen { false }
+
+  include '**/*IT.class'
+  maxParallelForks 4
+  classpath = sourceSets.test.runtimeClasspath
+  testClassesDirs = sourceSets.test.output.classesDirs
+  useJUnit {
+    excludeCategories "org.apache.beam.sdk.testing.UsesKms"
+  }
+}
+
 task postCommit {
   group = "Verification"
   description = "Integration tests of GCP connectors using the DirectRunner."
   dependsOn integrationTestKms
+  dependsOn integrationTestNoKms
 }
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 3c65f0f..2686a53 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -27,6 +27,7 @@
 import com.google.api.services.cloudresourcemanager.CloudResourceManager;
 import com.google.api.services.cloudresourcemanager.model.Project;
 import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Bucket.SoftDeletePolicy;
 import com.google.auth.Credentials;
 import com.google.auth.http.HttpCredentialsAdapter;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
@@ -392,40 +393,67 @@
       return tempLocation;
     }
 
-    /**
-     * Creates a default bucket or verifies the existence and proper access control of an existing
-     * default bucket. Returns the location if successful.
-     */
     @VisibleForTesting
-    static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) {
+    static ImmutableList<String> getDefaultBucketNameStubs(
+        PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
       GcsOptions gcsOptions = options.as(GcsOptions.class);
 
-      checkArgument(
-          isNullOrEmpty(gcsOptions.getDataflowKmsKey()),
-          "Cannot create a default bucket when --dataflowKmsKey is set.");
-
       final String projectId = gcsOptions.getProject();
       checkArgument(!isNullOrEmpty(projectId), "--project is a required option.");
 
-      // Look up the project number, to create a default bucket with a stable
-      // name with no special characters.
       long projectNumber = 0L;
       try {
         projectNumber = getProjectNumber(projectId, crmClient);
       } catch (IOException e) {
         throw new RuntimeException("Unable to verify project with ID " + projectId, e);
       }
+
       String region = DEFAULT_REGION;
       if (!isNullOrEmpty(gcsOptions.getZone())) {
         region = getRegionFromZone(gcsOptions.getZone());
       }
-      final String bucketName = "dataflow-staging-" + region + "-" + projectNumber;
+
+      return ImmutableList.of(bucketNamePrefix, region, String.valueOf(projectNumber));
+    }
+
+    /**
+     * Creates a default bucket or verifies the existence and proper access control of an existing
+     * default bucket. Returns the location if successful.
+     */
+    @VisibleForTesting
+    static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) {
+      return tryCreateDefaultBucketWithPrefix(options, crmClient, "dataflow-staging");
+    }
+
+    @VisibleForTesting
+    static String tryCreateDefaultBucketWithPrefix(
+        PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+
+      checkArgument(
+          isNullOrEmpty(gcsOptions.getDataflowKmsKey()),
+          "Cannot create a default bucket when --dataflowKmsKey is set.");
+
+      final List<String> bucketNameStubs =
+          getDefaultBucketNameStubs(options, crmClient, bucketNamePrefix);
+      final String region = bucketNameStubs.get(1);
+      final long projectNumber = Long.parseLong(bucketNameStubs.get(2));
+      final String bucketName = String.join("-", bucketNameStubs);
       LOG.info("No tempLocation specified, attempting to use default bucket: {}", bucketName);
-      Bucket bucket = new Bucket().setName(bucketName).setLocation(region);
+
+      // Disable soft delete policy for a bucket.
+      // Reference: https://cloud.google.com/storage/docs/soft-delete
+      SoftDeletePolicy softDeletePolicy = new SoftDeletePolicy().setRetentionDurationSeconds(0L);
+
+      Bucket bucket =
+          new Bucket()
+              .setName(bucketName)
+              .setLocation(region)
+              .setSoftDeletePolicy(softDeletePolicy);
       // Always try to create the bucket before checking access, so that we do not
       // race with other pipelines that may be attempting to do the same thing.
       try {
-        gcsOptions.getGcsUtil().createBucket(projectId, bucket);
+        gcsOptions.getGcsUtil().createBucket(gcsOptions.getProject(), bucket);
       } catch (FileAlreadyExistsException e) {
         LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
       } catch (IOException e) {
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 0338323..60e8443 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -652,6 +652,17 @@
     createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT);
   }
 
+  /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */
+  @Nullable
+  public Bucket getBucket(GcsPath path) throws IOException {
+    return getBucket(path, createBackOff(), Sleeper.DEFAULT);
+  }
+
+  /** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */
+  public void removeBucket(Bucket bucket) throws IOException {
+    removeBucket(bucket, createBackOff(), Sleeper.DEFAULT);
+  }
+
   /**
    * Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due
    * to permissions.
@@ -753,6 +764,40 @@
     }
   }
 
+  @VisibleForTesting
+  void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException {
+    Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName());
+
+    try {
+      ResilientOperation.retry(
+          getBucket::execute,
+          backoff,
+          new RetryDeterminer<IOException>() {
+            @Override
+            public boolean shouldRetry(IOException e) {
+              if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+                return false;
+              }
+              return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+            }
+          },
+          IOException.class,
+          sleeper);
+    } catch (GoogleJsonResponseException e) {
+      if (errorExtractor.accessDenied(e)) {
+        throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
+      }
+      if (errorExtractor.itemNotFound(e)) {
+        throw new FileNotFoundException(e.getMessage());
+      }
+      throw e;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(
+          String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e);
+    }
+  }
+
   private static void executeBatches(List<BatchInterface> batches) throws IOException {
     ExecutorService executor =
         MoreExecutors.listeningDecorator(
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java
new file mode 100644
index 0000000..138bb2f
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.storage.model.Bucket;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Random;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link GcpOptions}. These tests are designed to run against production
+ * Google Cloud Storage.
+ *
+ * <p>This is a runnerless integration test, even though the Beam IT framework assumes one. Thus,
+ * this test should only be run against single runner (such as DirectRunner).
+ */
+@RunWith(JUnit4.class)
+public class GcpOptionsIT {
+  /** Tests the creation of a default bucket in a project. */
+  @Test
+  public void testCreateDefaultBucket() throws IOException {
+    TestPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+
+    CloudResourceManager crmClient =
+        GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient(
+                options.as(CloudResourceManagerOptions.class))
+            .build();
+
+    GcsOptions gcsOptions = options.as(GcsOptions.class);
+    GcsUtil gcsUtil = gcsOptions.getGcsUtil();
+
+    Random rand = new Random();
+    // Add a random number to the prefix to avoid collision if multiple test instances
+    // are run at the same time. To avoid too many dangling buckets if bucket removal fails,
+    // we limit the max number of possible bucket names in this test to 1000.
+    String bucketNamePrefix = "gcp-options-it-" + rand.nextInt(1000);
+
+    String bucketName =
+        String.join(
+            "-",
+            GcpOptions.GcpTempLocationFactory.getDefaultBucketNameStubs(
+                options, crmClient, bucketNamePrefix));
+
+    // remove existing default bucket if any
+    try {
+      Bucket oldBucket = gcsUtil.getBucket(GcsPath.fromUri("gs://" + bucketName));
+      gcsUtil.removeBucket(oldBucket);
+    } catch (FileNotFoundException e) {
+      // the bucket to be created does not exist, which is good news
+    }
+
+    String tempLocation =
+        GcpOptions.GcpTempLocationFactory.tryCreateDefaultBucketWithPrefix(
+            options, crmClient, bucketNamePrefix);
+
+    GcsPath gcsPath = GcsPath.fromUri(tempLocation);
+    Bucket bucket = gcsUtil.getBucket(gcsPath);
+    assertNotNull(bucket);
+    // verify the soft delete policy is disabled
+    assertEquals(bucket.getSoftDeletePolicy().getRetentionDurationSeconds(), Long.valueOf(0L));
+
+    gcsUtil.removeBucket(bucket);
+    assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucket(gcsPath));
+  }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
index bf30b4c..a182f0a 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
@@ -21,10 +21,13 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.api.services.cloudresourcemanager.CloudResourceManager;
@@ -56,6 +59,7 @@
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
@@ -230,6 +234,14 @@
 
       String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
       assertEquals("gs://dataflow-staging-us-north1-1/temp/", bucket);
+
+      ArgumentCaptor<Bucket> bucketArg = ArgumentCaptor.forClass(Bucket.class);
+      verify(mockGcsUtil, times(1)).createBucket(anyString(), bucketArg.capture());
+
+      // verify that the soft delete policy is disabled in the default bucket
+      assertEquals(
+          bucketArg.getValue().getSoftDeletePolicy().getRetentionDurationSeconds(),
+          Long.valueOf(0L));
     }
 
     @Test