[BEAM-12740] Add option to CreateOptions to avoid GetObjectMetadata for creating unique temporary GCS file.
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f5bde8e..a212fd1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -39,9 +39,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
@@ -267,7 +265,7 @@
private static GcsUtil buildMockGcsUtil() throws IOException {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
- when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.then(
invocation ->
FileChannel.open(
@@ -276,7 +274,7 @@
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE));
- when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.then(
invocation ->
FileChannel.open(
@@ -882,7 +880,7 @@
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
- when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.then(
invocation ->
FileChannel.open(
@@ -950,7 +948,7 @@
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
- when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.then(
invocation ->
FileChannel.open(
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 5c6e813..ca5dd51 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -27,7 +27,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.times;
@@ -284,7 +283,8 @@
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+ .thenReturn(pipe.sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
@@ -294,7 +294,7 @@
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), endsWith(".txt"));
@@ -313,7 +313,7 @@
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.thenAnswer(invocation -> Pipe.open().sink());
List<DataflowPackage> targets =
@@ -342,7 +342,8 @@
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+ .thenReturn(pipe.sink());
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpDirectory.getAbsolutePath())),
@@ -350,7 +351,7 @@
createOptions);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
List<String> zipEntryNames = new ArrayList<>();
@@ -375,7 +376,8 @@
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+ .thenReturn(pipe.sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
@@ -385,7 +387,7 @@
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), endsWith(".jar"));
@@ -403,7 +405,7 @@
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.thenThrow(new IOException("Fake Exception: Upload error"));
try (PackageUtil directPackageUtil =
@@ -415,7 +417,7 @@
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil, times(5)).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
}
}
@@ -427,7 +429,7 @@
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.thenThrow(
new IOException(
"Failed to write to GCS path " + STAGING_PATH,
@@ -460,7 +462,7 @@
+ "login'")));
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
}
}
@@ -473,7 +475,7 @@
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
.thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
.thenReturn(pipe.sink()); // second attempt succeeds
@@ -486,7 +488,7 @@
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil, times(2)).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
}
}
@@ -520,7 +522,8 @@
ImmutableList.of(
StorageObjectOrIOException.create(
createStorageObject(STAGING_PATH, Long.MAX_VALUE))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+ .thenReturn(pipe.sink());
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpDirectory.getAbsolutePath())),
@@ -528,7 +531,7 @@
createOptions);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
}
@@ -542,7 +545,8 @@
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
- when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+ when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+ .thenReturn(pipe.sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
@@ -552,7 +556,7 @@
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
- verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), equalTo(overriddenName));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index f735ec3..6c3b176 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -50,6 +50,8 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
@@ -948,7 +950,16 @@
getWriteOperation().getSink().writableByteChannelFactory;
// The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
- WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType);
+ CreateOptions createOptions =
+ StandardCreateOptions.builder()
+ .setMimeType(channelMimeType)
+ // The file is based upon a uuid and thus we expect it to be unique and to not already
+ // exist. A new uuid is generated on each bundle processing and thus this also holds
+ // across bundle retries. Collisions of filenames would result in data loss as we
+ // would otherwise overwrite already finalized data.
+ .setExpectFileToNotExist(true)
+ .build();
+ WritableByteChannel tempChannel = FileSystems.create(outputFile, createOptions);
try {
channel = factory.create(tempChannel);
} catch (Exception e) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java
index 331e66f..85d2311 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java
@@ -24,9 +24,14 @@
/** The file-like resource mime type. */
public abstract String mimeType();
+ /** True if the file is expected to not exist. */
+ public abstract Boolean expectFileToNotExist();
+
/** An abstract builder for {@link CreateOptions}. */
public abstract static class Builder<BuilderT extends CreateOptions.Builder<BuilderT>> {
public abstract BuilderT setMimeType(String value);
+
+ public abstract BuilderT setExpectFileToNotExist(Boolean value);
}
/** A standard configuration options with builder. */
@@ -35,7 +40,8 @@
/** Returns a {@link StandardCreateOptions.Builder}. */
public static StandardCreateOptions.Builder builder() {
- return new AutoValue_CreateOptions_StandardCreateOptions.Builder();
+ return new AutoValue_CreateOptions_StandardCreateOptions.Builder()
+ .setExpectFileToNotExist(false);
}
/** Builder for {@link StandardCreateOptions}. */
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
index 1fdb871..ce534d6 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
@@ -37,7 +37,7 @@
/** Returns a {@link GcsCreateOptions.Builder}. */
public static GcsCreateOptions.Builder builder() {
- return new AutoValue_GcsCreateOptions.Builder();
+ return new AutoValue_GcsCreateOptions.Builder().setExpectFileToNotExist(false);
}
/** A builder for {@link GcsCreateOptions}. */
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index 6c39428..a547bc9 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -133,16 +133,16 @@
@Override
protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions)
throws IOException {
+ GcsUtil.CreateOptions.Builder builder =
+ GcsUtil.CreateOptions.builder()
+ .setContentType(createOptions.mimeType())
+ .setExpectFileToNotExist(createOptions.expectFileToNotExist());
if (createOptions instanceof GcsCreateOptions) {
- return options
- .getGcsUtil()
- .create(
- resourceId.getGcsPath(),
- createOptions.mimeType(),
+ builder =
+ builder.setUploadBufferSizeBytes(
((GcsCreateOptions) createOptions).gcsUploadBufferSizeBytes());
- } else {
- return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
}
+ return options.getGcsUtil().create(resourceId.getGcsPath(), builder.build());
}
@Override
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index f6ad78f..668d94b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -485,35 +485,93 @@
}
}
- /**
- * Creates an object in GCS.
- *
- * <p>Returns a WritableByteChannel that can be used to write data to the object.
- *
- * @param path the GCS file to write to
- * @param type the type of object, eg "text/plain".
- * @return a Callable object that encloses the operation.
- */
+ /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+ @Deprecated
public WritableByteChannel create(GcsPath path, String type) throws IOException {
- return create(path, type, uploadBufferSizeBytes);
+ CreateOptions.Builder builder = CreateOptions.builder().setContentType(type);
+ return create(path, builder.build());
+ }
+
+ /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+ @Deprecated
+ public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes)
+ throws IOException {
+ CreateOptions.Builder builder =
+ CreateOptions.builder()
+ .setContentType(type)
+ .setUploadBufferSizeBytes(uploadBufferSizeBytes);
+ return create(path, builder.build());
+ }
+
+ @AutoValue
+ public abstract static class CreateOptions {
+ /**
+ * If true, the created file is expected to not exist. Instead of checking for file presence
+ * before writing a write exception may occur if the file does exist.
+ */
+ public abstract boolean getExpectFileToNotExist();
+
+ /**
+ * If non-null, the upload buffer size to be used. If null, the buffer size corresponds to {code
+ * GCSUtil.getUploadBufferSizeBytes}
+ */
+ public abstract @Nullable Integer getUploadBufferSizeBytes();
+
+ /** The content type for the created file, eg "text/plain". */
+ public abstract @Nullable String getContentType();
+
+ public static Builder builder() {
+ return new AutoValue_GcsUtil_CreateOptions.Builder().setExpectFileToNotExist(false);
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setContentType(String value);
+
+ public abstract Builder setUploadBufferSizeBytes(int value);
+
+ public abstract Builder setExpectFileToNotExist(boolean value);
+
+ public abstract CreateOptions build();
+ }
}
/**
- * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding {code
- * uploadBufferSizeBytes}.
+ * Creates an object in GCS and prepares for uploading its contents.
+ *
+ * @param path the GCS file to write to
+ * @param options to be used for creating and configuring file upload
+ * @return a WritableByteChannel that can be used to write data to the object.
*/
- public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes)
- throws IOException {
+ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException {
AsyncWriteChannelOptions wcOptions = googleCloudStorageOptions.getWriteChannelOptions();
- int uploadChunkSize =
- (uploadBufferSizeBytes == null) ? wcOptions.getUploadChunkSize() : uploadBufferSizeBytes;
- AsyncWriteChannelOptions newOptions =
- wcOptions.toBuilder().setUploadChunkSize(uploadChunkSize).build();
+ @Nullable
+ Integer uploadBufferSizeBytes =
+ options.getUploadBufferSizeBytes() != null
+ ? options.getUploadBufferSizeBytes()
+ : getUploadBufferSizeBytes();
+ if (uploadBufferSizeBytes != null) {
+ wcOptions = wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build();
+ }
GoogleCloudStorageOptions newGoogleCloudStorageOptions =
- googleCloudStorageOptions.toBuilder().setWriteChannelOptions(newOptions).build();
+ googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
GoogleCloudStorage gcpStorage =
new GoogleCloudStorageImpl(
newGoogleCloudStorageOptions, this.storageClient, this.credentials);
+ StorageResourceId resourceId =
+ new StorageResourceId(
+ path.getBucket(),
+ path.getObject(),
+ // If we expect the file not to exist, we set a generation id of 0. This avoids a read
+ // to identify the object exists already and should be overwritten.
+ // See {@link GoogleCloudStorage#create(StorageResourceId, GoogleCloudStorageOptions)}
+ options.getExpectFileToNotExist() ? 0L : StorageResourceId.UNKNOWN_GENERATION_ID);
+ CreateObjectOptions.Builder createBuilder =
+ CreateObjectOptions.builder().setOverwriteExisting(true);
+ if (options.getContentType() != null) {
+ createBuilder = createBuilder.setContentType(options.getContentType());
+ }
+
HashMap<String, String> baseLabels = new HashMap<>();
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage");
@@ -528,13 +586,7 @@
ServiceCallMetric serviceCallMetric =
new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
try {
- WritableByteChannel channel =
- gcpStorage.create(
- new StorageResourceId(path.getBucket(), path.getObject()),
- CreateObjectOptions.builder()
- .setOverwriteExisting(true)
- .setContentType(type)
- .build());
+ WritableByteChannel channel = gcpStorage.create(resourceId, createBuilder.build());
serviceCallMetric.call("ok");
return channel;
} catch (IOException e) {