[BEAM-12740] Add option to CreateOptions to avoid GetObjectMetadata for creating unique temporary GCS file.
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f5bde8e..a212fd1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -39,9 +39,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeFalse;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
@@ -267,7 +265,7 @@
   private static GcsUtil buildMockGcsUtil() throws IOException {
     GcsUtil mockGcsUtil = mock(GcsUtil.class);
 
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .then(
             invocation ->
                 FileChannel.open(
@@ -276,7 +274,7 @@
                     StandardOpenOption.WRITE,
                     StandardOpenOption.DELETE_ON_CLOSE));
 
-    when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .then(
             invocation ->
                 FileChannel.open(
@@ -882,7 +880,7 @@
     options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
 
-    when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .then(
             invocation ->
                 FileChannel.open(
@@ -950,7 +948,7 @@
     options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
 
-    when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .then(
             invocation ->
                 FileChannel.open(
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 5c6e813..ca5dd51 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -27,7 +27,6 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.times;
@@ -284,7 +283,8 @@
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
 
-    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+        .thenReturn(pipe.sink());
 
     List<DataflowPackage> targets =
         defaultPackageUtil.stageClasspathElements(
@@ -294,7 +294,7 @@
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
     verifyNoMoreInteractions(mockGcsUtil);
 
     assertThat(target.getName(), endsWith(".txt"));
@@ -313,7 +313,7 @@
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
 
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .thenAnswer(invocation -> Pipe.open().sink());
 
     List<DataflowPackage> targets =
@@ -342,7 +342,8 @@
         .thenReturn(
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+        .thenReturn(pipe.sink());
 
     defaultPackageUtil.stageClasspathElements(
         ImmutableList.of(makeStagedFile(tmpDirectory.getAbsolutePath())),
@@ -350,7 +351,7 @@
         createOptions);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
     verifyNoMoreInteractions(mockGcsUtil);
 
     List<String> zipEntryNames = new ArrayList<>();
@@ -375,7 +376,8 @@
         .thenReturn(
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+        .thenReturn(pipe.sink());
 
     List<DataflowPackage> targets =
         defaultPackageUtil.stageClasspathElements(
@@ -385,7 +387,7 @@
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
     verifyNoMoreInteractions(mockGcsUtil);
 
     assertThat(target.getName(), endsWith(".jar"));
@@ -403,7 +405,7 @@
         .thenReturn(
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .thenThrow(new IOException("Fake Exception: Upload error"));
 
     try (PackageUtil directPackageUtil =
@@ -415,7 +417,7 @@
           createOptions);
     } finally {
       verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-      verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
+      verify(mockGcsUtil, times(5)).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
       verifyNoMoreInteractions(mockGcsUtil);
     }
   }
@@ -427,7 +429,7 @@
         .thenReturn(
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .thenThrow(
             new IOException(
                 "Failed to write to GCS path " + STAGING_PATH,
@@ -460,7 +462,7 @@
                       + "login'")));
     } finally {
       verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-      verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+      verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
       verifyNoMoreInteractions(mockGcsUtil);
     }
   }
@@ -473,7 +475,7 @@
         .thenReturn(
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
         .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
         .thenReturn(pipe.sink()); // second attempt succeeds
 
@@ -486,7 +488,7 @@
           createOptions);
     } finally {
       verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-      verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
+      verify(mockGcsUtil, times(2)).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
       verifyNoMoreInteractions(mockGcsUtil);
     }
   }
@@ -520,7 +522,8 @@
             ImmutableList.of(
                 StorageObjectOrIOException.create(
                     createStorageObject(STAGING_PATH, Long.MAX_VALUE))));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+        .thenReturn(pipe.sink());
 
     defaultPackageUtil.stageClasspathElements(
         ImmutableList.of(makeStagedFile(tmpDirectory.getAbsolutePath())),
@@ -528,7 +531,7 @@
         createOptions);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
     verifyNoMoreInteractions(mockGcsUtil);
   }
 
@@ -542,7 +545,8 @@
         .thenReturn(
             ImmutableList.of(
                 StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
-    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+    when(mockGcsUtil.create(any(GcsPath.class), any(GcsUtil.CreateOptions.class)))
+        .thenReturn(pipe.sink());
 
     List<DataflowPackage> targets =
         defaultPackageUtil.stageClasspathElements(
@@ -552,7 +556,7 @@
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
-    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verify(mockGcsUtil).create(any(GcsPath.class), any(GcsUtil.CreateOptions.class));
     verifyNoMoreInteractions(mockGcsUtil);
 
     assertThat(target.getName(), equalTo(overriddenName));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index f735ec3..6c3b176 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -50,6 +50,8 @@
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
@@ -948,7 +950,16 @@
           getWriteOperation().getSink().writableByteChannelFactory;
       // The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
       String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-      WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType);
+      CreateOptions createOptions =
+          StandardCreateOptions.builder()
+              .setMimeType(channelMimeType)
+              // The file is based upon a uuid and thus we expect it to be unique and to not already
+              // exist. A new uuid is generated on each bundle processing and thus this also holds
+              // across bundle retries. Collisions of filenames would result in data loss as we
+              // would otherwise overwrite already finalized data.
+              .setExpectFileToNotExist(true)
+              .build();
+      WritableByteChannel tempChannel = FileSystems.create(outputFile, createOptions);
       try {
         channel = factory.create(tempChannel);
       } catch (Exception e) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java
index 331e66f..85d2311 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/CreateOptions.java
@@ -24,9 +24,14 @@
   /** The file-like resource mime type. */
   public abstract String mimeType();
 
+  /** True if the file is expected to not exist. */
+  public abstract Boolean expectFileToNotExist();
+
   /** An abstract builder for {@link CreateOptions}. */
   public abstract static class Builder<BuilderT extends CreateOptions.Builder<BuilderT>> {
     public abstract BuilderT setMimeType(String value);
+
+    public abstract BuilderT setExpectFileToNotExist(Boolean value);
   }
 
   /** A standard configuration options with builder. */
@@ -35,7 +40,8 @@
 
     /** Returns a {@link StandardCreateOptions.Builder}. */
     public static StandardCreateOptions.Builder builder() {
-      return new AutoValue_CreateOptions_StandardCreateOptions.Builder();
+      return new AutoValue_CreateOptions_StandardCreateOptions.Builder()
+          .setExpectFileToNotExist(false);
     }
 
     /** Builder for {@link StandardCreateOptions}. */
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
index 1fdb871..ce534d6 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
@@ -37,7 +37,7 @@
 
   /** Returns a {@link GcsCreateOptions.Builder}. */
   public static GcsCreateOptions.Builder builder() {
-    return new AutoValue_GcsCreateOptions.Builder();
+    return new AutoValue_GcsCreateOptions.Builder().setExpectFileToNotExist(false);
   }
 
   /** A builder for {@link GcsCreateOptions}. */
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index 6c39428..a547bc9 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -133,16 +133,16 @@
   @Override
   protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions)
       throws IOException {
+    GcsUtil.CreateOptions.Builder builder =
+        GcsUtil.CreateOptions.builder()
+            .setContentType(createOptions.mimeType())
+            .setExpectFileToNotExist(createOptions.expectFileToNotExist());
     if (createOptions instanceof GcsCreateOptions) {
-      return options
-          .getGcsUtil()
-          .create(
-              resourceId.getGcsPath(),
-              createOptions.mimeType(),
+      builder =
+          builder.setUploadBufferSizeBytes(
               ((GcsCreateOptions) createOptions).gcsUploadBufferSizeBytes());
-    } else {
-      return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
     }
+    return options.getGcsUtil().create(resourceId.getGcsPath(), builder.build());
   }
 
   @Override
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index f6ad78f..668d94b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -485,35 +485,93 @@
     }
   }
 
-  /**
-   * Creates an object in GCS.
-   *
-   * <p>Returns a WritableByteChannel that can be used to write data to the object.
-   *
-   * @param path the GCS file to write to
-   * @param type the type of object, eg "text/plain".
-   * @return a Callable object that encloses the operation.
-   */
+  /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+  @Deprecated
   public WritableByteChannel create(GcsPath path, String type) throws IOException {
-    return create(path, type, uploadBufferSizeBytes);
+    CreateOptions.Builder builder = CreateOptions.builder().setContentType(type);
+    return create(path, builder.build());
+  }
+
+  /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+  @Deprecated
+  public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes)
+      throws IOException {
+    CreateOptions.Builder builder =
+        CreateOptions.builder()
+            .setContentType(type)
+            .setUploadBufferSizeBytes(uploadBufferSizeBytes);
+    return create(path, builder.build());
+  }
+
+  @AutoValue
+  public abstract static class CreateOptions {
+    /**
+     * If true, the created file is expected to not exist. Instead of checking for file presence
+     * before writing a write exception may occur if the file does exist.
+     */
+    public abstract boolean getExpectFileToNotExist();
+
+    /**
+     * If non-null, the upload buffer size to be used. If null, the buffer size corresponds to {code
+     * GCSUtil.getUploadBufferSizeBytes}
+     */
+    public abstract @Nullable Integer getUploadBufferSizeBytes();
+
+    /** The content type for the created file, eg "text/plain". */
+    public abstract @Nullable String getContentType();
+
+    public static Builder builder() {
+      return new AutoValue_GcsUtil_CreateOptions.Builder().setExpectFileToNotExist(false);
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setContentType(String value);
+
+      public abstract Builder setUploadBufferSizeBytes(int value);
+
+      public abstract Builder setExpectFileToNotExist(boolean value);
+
+      public abstract CreateOptions build();
+    }
   }
 
   /**
-   * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding {code
-   * uploadBufferSizeBytes}.
+   * Creates an object in GCS and prepares for uploading its contents.
+   *
+   * @param path the GCS file to write to
+   * @param options to be used for creating and configuring file upload
+   * @return a WritableByteChannel that can be used to write data to the object.
    */
-  public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes)
-      throws IOException {
+  public WritableByteChannel create(GcsPath path, CreateOptions options) throws IOException {
     AsyncWriteChannelOptions wcOptions = googleCloudStorageOptions.getWriteChannelOptions();
-    int uploadChunkSize =
-        (uploadBufferSizeBytes == null) ? wcOptions.getUploadChunkSize() : uploadBufferSizeBytes;
-    AsyncWriteChannelOptions newOptions =
-        wcOptions.toBuilder().setUploadChunkSize(uploadChunkSize).build();
+    @Nullable
+    Integer uploadBufferSizeBytes =
+        options.getUploadBufferSizeBytes() != null
+            ? options.getUploadBufferSizeBytes()
+            : getUploadBufferSizeBytes();
+    if (uploadBufferSizeBytes != null) {
+      wcOptions = wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build();
+    }
     GoogleCloudStorageOptions newGoogleCloudStorageOptions =
-        googleCloudStorageOptions.toBuilder().setWriteChannelOptions(newOptions).build();
+        googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
     GoogleCloudStorage gcpStorage =
         new GoogleCloudStorageImpl(
             newGoogleCloudStorageOptions, this.storageClient, this.credentials);
+    StorageResourceId resourceId =
+        new StorageResourceId(
+            path.getBucket(),
+            path.getObject(),
+            // If we expect the file not to exist, we set a generation id of 0. This avoids a read
+            // to identify the object exists already and should be overwritten.
+            // See {@link GoogleCloudStorage#create(StorageResourceId, GoogleCloudStorageOptions)}
+            options.getExpectFileToNotExist() ? 0L : StorageResourceId.UNKNOWN_GENERATION_ID);
+    CreateObjectOptions.Builder createBuilder =
+        CreateObjectOptions.builder().setOverwriteExisting(true);
+    if (options.getContentType() != null) {
+      createBuilder = createBuilder.setContentType(options.getContentType());
+    }
+
     HashMap<String, String> baseLabels = new HashMap<>();
     baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
     baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage");
@@ -528,13 +586,7 @@
     ServiceCallMetric serviceCallMetric =
         new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
     try {
-      WritableByteChannel channel =
-          gcpStorage.create(
-              new StorageResourceId(path.getBucket(), path.getObject()),
-              CreateObjectOptions.builder()
-                  .setOverwriteExisting(true)
-                  .setContentType(type)
-                  .build());
+      WritableByteChannel channel = gcpStorage.create(resourceId, createBuilder.build());
       serviceCallMetric.call("ok");
       return channel;
     } catch (IOException e) {