[Java] Disable soft delete policy when creating a default bucket for a project. (#31324)
* Disable soft delete policy when creating a default bucket for a project.
Also, getBucket() and removeBucket() are added into GcsUtil.
* Add random number to prefix to avoid collision on multiple test instances.
* Remove default bucket before creating in the integration test.
* Add one line in CHANGES.md to mention this change and fix a typo.
diff --git a/CHANGES.md b/CHANGES.md
index dd66e91..a6c61da 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -72,6 +72,7 @@
Template variables can be passed with the (json-formatted) `--jinja_variables` flag.
* DataFrame API now supports pandas 2.1.x and adds 12 more string functions for Series.([#31185](https://github.com/apache/beam/pull/31185)).
* Added BigQuery handler for enrichment transform (Python) ([#31295](https://github.com/apache/beam/pull/31295))
+* Disable soft delete policy when creating the default bucket for a project (Java) ([#31324](https://github.com/apache/beam/pull/31324)).
## Breaking Changes
diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle
index d4dfd46..4af856c 100644
--- a/sdks/java/extensions/google-cloud-platform-core/build.gradle
+++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle
@@ -87,8 +87,32 @@
}
}
+// Note that no runner is specified here, so tests running under this task should not be running
+// pipelines.
+task integrationTestNoKms(type: Test) {
+ group = "Verification"
+ def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+ def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests-cmek'
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+ "--project=${gcpProject}",
+ "--tempRoot=${gcpTempRoot}",
+ ])
+
+ // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
+ outputs.upToDateWhen { false }
+
+ include '**/*IT.class'
+ maxParallelForks 4
+ classpath = sourceSets.test.runtimeClasspath
+ testClassesDirs = sourceSets.test.output.classesDirs
+ useJUnit {
+ excludeCategories "org.apache.beam.sdk.testing.UsesKms"
+ }
+}
+
task postCommit {
group = "Verification"
description = "Integration tests of GCP connectors using the DirectRunner."
dependsOn integrationTestKms
+ dependsOn integrationTestNoKms
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 3c65f0f..2686a53 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -27,6 +27,7 @@
import com.google.api.services.cloudresourcemanager.CloudResourceManager;
import com.google.api.services.cloudresourcemanager.model.Project;
import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Bucket.SoftDeletePolicy;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
@@ -392,40 +393,67 @@
return tempLocation;
}
- /**
- * Creates a default bucket or verifies the existence and proper access control of an existing
- * default bucket. Returns the location if successful.
- */
@VisibleForTesting
- static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) {
+ static ImmutableList<String> getDefaultBucketNameStubs(
+ PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
GcsOptions gcsOptions = options.as(GcsOptions.class);
- checkArgument(
- isNullOrEmpty(gcsOptions.getDataflowKmsKey()),
- "Cannot create a default bucket when --dataflowKmsKey is set.");
-
final String projectId = gcsOptions.getProject();
checkArgument(!isNullOrEmpty(projectId), "--project is a required option.");
- // Look up the project number, to create a default bucket with a stable
- // name with no special characters.
long projectNumber = 0L;
try {
projectNumber = getProjectNumber(projectId, crmClient);
} catch (IOException e) {
throw new RuntimeException("Unable to verify project with ID " + projectId, e);
}
+
String region = DEFAULT_REGION;
if (!isNullOrEmpty(gcsOptions.getZone())) {
region = getRegionFromZone(gcsOptions.getZone());
}
- final String bucketName = "dataflow-staging-" + region + "-" + projectNumber;
+
+ return ImmutableList.of(bucketNamePrefix, region, String.valueOf(projectNumber));
+ }
+
+ /**
+ * Creates a default bucket or verifies the existence and proper access control of an existing
+ * default bucket. Returns the location if successful.
+ */
+ @VisibleForTesting
+ static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) {
+ return tryCreateDefaultBucketWithPrefix(options, crmClient, "dataflow-staging");
+ }
+
+ @VisibleForTesting
+ static String tryCreateDefaultBucketWithPrefix(
+ PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+
+ checkArgument(
+ isNullOrEmpty(gcsOptions.getDataflowKmsKey()),
+ "Cannot create a default bucket when --dataflowKmsKey is set.");
+
+ final List<String> bucketNameStubs =
+ getDefaultBucketNameStubs(options, crmClient, bucketNamePrefix);
+ final String region = bucketNameStubs.get(1);
+ final long projectNumber = Long.parseLong(bucketNameStubs.get(2));
+ final String bucketName = String.join("-", bucketNameStubs);
LOG.info("No tempLocation specified, attempting to use default bucket: {}", bucketName);
- Bucket bucket = new Bucket().setName(bucketName).setLocation(region);
+
+ // Disable soft delete policy for a bucket.
+ // Reference: https://cloud.google.com/storage/docs/soft-delete
+ SoftDeletePolicy softDeletePolicy = new SoftDeletePolicy().setRetentionDurationSeconds(0L);
+
+ Bucket bucket =
+ new Bucket()
+ .setName(bucketName)
+ .setLocation(region)
+ .setSoftDeletePolicy(softDeletePolicy);
// Always try to create the bucket before checking access, so that we do not
// race with other pipelines that may be attempting to do the same thing.
try {
- gcsOptions.getGcsUtil().createBucket(projectId, bucket);
+ gcsOptions.getGcsUtil().createBucket(gcsOptions.getProject(), bucket);
} catch (FileAlreadyExistsException e) {
LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
} catch (IOException e) {
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 0338323..60e8443 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -652,6 +652,17 @@
createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT);
}
+ /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */
+ @Nullable
+ public Bucket getBucket(GcsPath path) throws IOException {
+ return getBucket(path, createBackOff(), Sleeper.DEFAULT);
+ }
+
+ /** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */
+ public void removeBucket(Bucket bucket) throws IOException {
+ removeBucket(bucket, createBackOff(), Sleeper.DEFAULT);
+ }
+
/**
* Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due
* to permissions.
@@ -753,6 +764,40 @@
}
}
+ @VisibleForTesting
+ void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException {
+ Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName());
+
+ try {
+ ResilientOperation.retry(
+ getBucket::execute,
+ backoff,
+ new RetryDeterminer<IOException>() {
+ @Override
+ public boolean shouldRetry(IOException e) {
+ if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+ return false;
+ }
+ return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+ }
+ },
+ IOException.class,
+ sleeper);
+ } catch (GoogleJsonResponseException e) {
+ if (errorExtractor.accessDenied(e)) {
+ throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
+ }
+ if (errorExtractor.itemNotFound(e)) {
+ throw new FileNotFoundException(e.getMessage());
+ }
+ throw e;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e);
+ }
+ }
+
private static void executeBatches(List<BatchInterface> batches) throws IOException {
ExecutorService executor =
MoreExecutors.listeningDecorator(
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java
new file mode 100644
index 0000000..138bb2f
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.storage.model.Bucket;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Random;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link GcpOptions}. These tests are designed to run against production
+ * Google Cloud Storage.
+ *
+ * <p>This is a runnerless integration test, even though the Beam IT framework assumes one. Thus,
+ * this test should only be run against single runner (such as DirectRunner).
+ */
+@RunWith(JUnit4.class)
+public class GcpOptionsIT {
+ /** Tests the creation of a default bucket in a project. */
+ @Test
+ public void testCreateDefaultBucket() throws IOException {
+ TestPipelineOptions options =
+ TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+
+ CloudResourceManager crmClient =
+ GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient(
+ options.as(CloudResourceManagerOptions.class))
+ .build();
+
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ GcsUtil gcsUtil = gcsOptions.getGcsUtil();
+
+ Random rand = new Random();
+ // Add a random number to the prefix to avoid collision if multiple test instances
+ // are run at the same time. To avoid too many dangling buckets if bucket removal fails,
+ // we limit the max number of possible bucket names in this test to 1000.
+ String bucketNamePrefix = "gcp-options-it-" + rand.nextInt(1000);
+
+ String bucketName =
+ String.join(
+ "-",
+ GcpOptions.GcpTempLocationFactory.getDefaultBucketNameStubs(
+ options, crmClient, bucketNamePrefix));
+
+ // remove existing default bucket if any
+ try {
+ Bucket oldBucket = gcsUtil.getBucket(GcsPath.fromUri("gs://" + bucketName));
+ gcsUtil.removeBucket(oldBucket);
+ } catch (FileNotFoundException e) {
+ // the bucket to be created does not exist, which is good news
+ }
+
+ String tempLocation =
+ GcpOptions.GcpTempLocationFactory.tryCreateDefaultBucketWithPrefix(
+ options, crmClient, bucketNamePrefix);
+
+ GcsPath gcsPath = GcsPath.fromUri(tempLocation);
+ Bucket bucket = gcsUtil.getBucket(gcsPath);
+ assertNotNull(bucket);
+ // verify the soft delete policy is disabled
+ assertEquals(bucket.getSoftDeletePolicy().getRetentionDurationSeconds(), Long.valueOf(0L));
+
+ gcsUtil.removeBucket(bucket);
+ assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucket(gcsPath));
+ }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
index bf30b4c..a182f0a 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
@@ -21,10 +21,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.api.services.cloudresourcemanager.CloudResourceManager;
@@ -56,6 +59,7 @@
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -230,6 +234,14 @@
String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
assertEquals("gs://dataflow-staging-us-north1-1/temp/", bucket);
+
+ ArgumentCaptor<Bucket> bucketArg = ArgumentCaptor.forClass(Bucket.class);
+ verify(mockGcsUtil, times(1)).createBucket(anyString(), bucketArg.capture());
+
+ // verify that the soft delete policy is disabled in the default bucket
+ assertEquals(
+ bucketArg.getValue().getSoftDeletePolicy().getRetentionDurationSeconds(),
+ Long.valueOf(0L));
}
@Test