Merge pull request #14691 from Add PatchResources to FhirIO.

* Add PatchResources to FhirIO.

References:
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch
https://cloud.google.com/healthcare/docs/how-tos/fhir-resources#patching_a_fhir_resource
https://cloud.google.com/healthcare/docs/how-tos/fhir-resources#conditionally_patching_a_fhir_resource

The Google Cloud FHIR service doesn't support neither Patch nor
Conditional Patch within bundles, so it requires it's own connector.

* Run spotless

* Fix @Nullable tags

* Fix guava import

* Prototype AutoValue implementation

* Final AutoValue implementation

* Add GroupIntoBatches to PatchResources.

* Private constructor
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index 5bc73ce..7fd44ce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -28,6 +28,7 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
@@ -58,6 +59,7 @@
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.fs.ResourceIdCoder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
 import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.metrics.Counter;
@@ -65,6 +67,8 @@
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
@@ -101,19 +105,19 @@
  * <h3>Reading</h3>
  *
  * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a
- * ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from
+ * ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}. This is appropriate for reading the Fhir notifications from
  * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
  * prepared list of messages that you need to process (e.g. in a text file read with {@link
  * org.apache.beam.sdk.io.TextIO}*) .
  *
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of FHIR resource name strings
  * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a
- * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
+ * {@link PCollection} containing the successfully fetched json resources as {@link String}s and/or {@link
  * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
- * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
+ * HealthcareIOError}* containing the resources that could not be fetched and the exception as a
  * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
  * choosing. This error handling is mainly to transparently surface errors where the upstream {@link
- * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues.
+ * PCollection}* contains FHIR resources that are not valid or are not reachable due to permissions issues.
  *
  * <h3>Writing</h3>
  *
@@ -383,6 +387,16 @@
   }
 
   /**
+   * Patch FHIR resources, @see <a
+   * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch></a>.
+   *
+   * @return the patch
+   */
+  public static PatchResources patchResources() {
+    return new PatchResources();
+  }
+
+  /**
    * Increments success and failure counters for an LRO. To be used after the LRO has completed.
    * This function leverages the fact that the LRO metadata is always of the format: "counter": {
    * "success": "1", "failure": "1" }
@@ -1330,6 +1344,7 @@
 
   /** The type Execute bundles. */
   public static class ExecuteBundles extends PTransform<PCollection<String>, Write.Result> {
+
     private final ValueProvider<String> fhirStore;
 
     /**
@@ -1417,8 +1432,109 @@
     }
   }
 
+  /** The type Patch resources. */
+  public static class PatchResources extends PTransform<PCollection<Input>, Write.Result> {
+
+    private PatchResources() {}
+
+    /** Represents the input parameters for a single FHIR patch request. */
+    @DefaultSchema(AutoValueSchema.class)
+    @AutoValue
+    abstract static class Input implements Serializable {
+      abstract String getResourceName();
+
+      abstract String getPatch();
+
+      abstract @Nullable Map<String, String> getQuery();
+
+      static Builder builder() {
+        return new AutoValue_FhirIO_PatchResources_Input.Builder();
+      }
+
+      @AutoValue.Builder
+      abstract static class Builder {
+        abstract Builder setResourceName(String resourceName);
+
+        abstract Builder setPatch(String patch);
+
+        abstract Builder setQuery(Map<String, String> query);
+
+        abstract Input build();
+      }
+    }
+
+    @Override
+    public FhirIO.Write.Result expand(PCollection<Input> input) {
+      int numShards = 10;
+      int batchSize = 10000;
+      PCollectionTuple bodies =
+          // Shard input into batches to improve worker performance.
+          input
+              .apply(
+                  "Shard input",
+                  WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, numShards)))
+              .setCoder(KvCoder.of(TextualIntegerCoder.of(), input.getCoder()))
+              .apply("Assemble batches", GroupIntoBatches.ofSize(batchSize))
+              .setCoder(KvCoder.of(TextualIntegerCoder.of(), IterableCoder.of(input.getCoder())))
+              .apply(
+                  ParDo.of(new PatchResourcesFn())
+                      .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY)));
+      bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
+      bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+      return Write.Result.in(input.getPipeline(), bodies);
+    }
+
+    /** The type Write Fhir fn. */
+    static class PatchResourcesFn extends DoFn<KV<Integer, Iterable<Input>>, String> {
+
+      private static final Counter PATCH_RESOURCES_ERRORS =
+          Metrics.counter(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_error_count");
+      private static final Counter PATCH_RESOURCES_SUCCESS =
+          Metrics.counter(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_success_count");
+      private static final Distribution PATCH_RESOURCES_LATENCY_MS =
+          Metrics.distribution(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_latency_ms");
+
+      private transient HealthcareApiClient client;
+      private final ObjectMapper mapper = new ObjectMapper();
+
+      /**
+       * Initialize healthcare client.
+       *
+       * @throws IOException If the Healthcare client cannot be created.
+       */
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @ProcessElement
+      public void patchResources(ProcessContext context) {
+        Iterable<Input> batch = context.element().getValue();
+        for (Input patchParameter : batch) {
+          try {
+            long startTime = Instant.now().toEpochMilli();
+            client.patchFhirResource(
+                patchParameter.getResourceName(),
+                patchParameter.getPatch(),
+                patchParameter.getQuery());
+            PATCH_RESOURCES_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime);
+            PATCH_RESOURCES_SUCCESS.inc();
+            context.output(Write.SUCCESSFUL_BODY, patchParameter.toString());
+          } catch (IOException | HealthcareHttpException e) {
+            PATCH_RESOURCES_ERRORS.inc();
+            context.output(Write.FAILED_BODY, HealthcareIOError.of(patchParameter.toString(), e));
+          }
+        }
+      }
+    }
+  }
+
   /** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */
   public static class Export extends PTransform<PBegin, PCollection<String>> {
+
     private final ValueProvider<String> fhirStore;
     private final ValueProvider<String> exportGcsUriPrefix;
 
@@ -1481,6 +1597,7 @@
 
   /** Deidentify FHIR resources from a FHIR store to a destination FHIR store. */
   public static class Deidentify extends PTransform<PBegin, PCollection<String>> {
+
     private final ValueProvider<String> sourceFhirStore;
     private final ValueProvider<String> destinationFhirStore;
     private final ValueProvider<DeidentifyConfig> deidConfig;
@@ -1551,6 +1668,7 @@
   /** The type Search. */
   public static class Search<T>
       extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> {
+
     private static final Logger LOG = LoggerFactory.getLogger(Search.class);
 
     private final ValueProvider<String> fhirStore;
@@ -1564,6 +1682,7 @@
     }
 
     public static class Result implements POutput, PInput {
+
       private PCollection<KV<String, JsonArray>> keyedResources;
       private PCollection<JsonArray> resources;
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
index 6f80d86..db34d33 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
@@ -46,11 +46,26 @@
     this.queries = queries;
   }
 
+  /**
+   * Creates a FhirSearchParameter to represent a FHIR Search request.
+   *
+   * @param resourceType resource type for search, leave empty for all
+   * @param key optional key to index searches by
+   * @param queries search query, with field as key and search as value
+   * @return FhirSearchParameter
+   */
   public static <T> FhirSearchParameter<T> of(
       String resourceType, @Nullable String key, @Nullable Map<String, T> queries) {
     return new FhirSearchParameter<>(resourceType, key, queries);
   }
 
+  /**
+   * Creates a FhirSearchParameter to represent a FHIR Search request.
+   *
+   * @param resourceType resource type for search, leave empty for all
+   * @param queries search query, with field as key and search as value
+   * @return FhirSearchParameter
+   */
   public static <T> FhirSearchParameter<T> of(
       String resourceType, @Nullable Map<String, T> queries) {
     return new FhirSearchParameter<>(resourceType, null, queries);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
index f4e38c4..81b2a75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
@@ -162,13 +162,27 @@
       throws IOException, HealthcareHttpException;
 
   /**
+   * Patch fhir resource http body.
+   *
+   * @param resourceName the resource name, in format
+   *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}[/{id}], id not
+   *     present when queryString is specified.
+   * @param patch the patch operation
+   * @param query optional query for conditional patches
+   * @return the http body
+   */
+  HttpBody patchFhirResource(String resourceName, String patch, @Nullable Map<String, String> query)
+      throws IOException, HealthcareHttpException;
+
+  /**
    * Read fhir resource http body.
    *
-   * @param resourceId the resource
+   * @param resourceName the resource name, in format
+   *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
    * @return the http body
    * @throws IOException the io exception
    */
-  HttpBody readFhirResource(String resourceId) throws IOException;
+  HttpBody readFhirResource(String resourceName) throws IOException;
 
   /**
    * Search fhir resource http body.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index 283d011..929fe2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -100,6 +100,7 @@
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable {
+
   private static final String USER_AGENT =
       String.format(
           "apache-beam-io-google-cloud-platform-healthcare/%s",
@@ -107,6 +108,7 @@
   private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
   private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
   private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
+  private static final String FHIRSTORE_PATCH_CONTENT_TYPE = "application/json-patch+json";
   private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
   private transient CloudHealthcare client;
   private transient HttpClient httpClient;
@@ -594,11 +596,61 @@
     return responseModel;
   }
 
+  @Override
+  public HttpBody patchFhirResource(
+      String resourceName, String patch, @Nullable Map<String, String> query)
+      throws IOException, HealthcareHttpException {
+    if (httpClient == null || client == null) {
+      initClient();
+    }
+
+    credentials.refreshIfExpired();
+    StringEntity requestEntity = new StringEntity(patch, ContentType.APPLICATION_JSON);
+    URI uri;
+    try {
+      URIBuilder uriBuilder = new URIBuilder(client.getRootUrl() + "v1beta1/" + resourceName);
+      if (query != null) {
+        for (Map.Entry<String, String> q : query.entrySet()) {
+          uriBuilder.addParameter(q.getKey(), q.getValue());
+        }
+      }
+      uri = uriBuilder.build();
+    } catch (URISyntaxException e) {
+      LOG.error("URL error when making patch request to FHIR API. " + e.getMessage());
+      throw new IllegalArgumentException(e);
+    }
+
+    RequestBuilder requestBuilder =
+        RequestBuilder.patch()
+            .setUri(uri)
+            .setEntity(requestEntity)
+            .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue())
+            .addHeader("User-Agent", USER_AGENT)
+            .addHeader("Content-Type", FHIRSTORE_PATCH_CONTENT_TYPE)
+            .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT)
+            .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET);
+
+    HttpUriRequest request = requestBuilder.build();
+    HttpResponse response = httpClient.execute(request);
+    HttpEntity responseEntity = response.getEntity();
+    String content = EntityUtils.toString(responseEntity);
+
+    // Check 2XX code.
+    int statusCode = response.getStatusLine().getStatusCode();
+    if (!(statusCode / 100 == 2)) {
+      throw HealthcareHttpException.of(statusCode, content);
+    }
+    HttpBody responseModel = new HttpBody();
+    responseModel.setData(content);
+    return responseModel;
+  }
+
   /**
    * Wraps {@link HttpResponse} in an exception with a statusCode field for use with {@link
    * HealthcareIOError}.
    */
   public static class HealthcareHttpException extends Exception {
+
     private final int statusCode;
 
     private HealthcareHttpException(int statusCode, String message) {
@@ -630,8 +682,15 @@
   }
 
   @Override
-  public HttpBody readFhirResource(String resourceId) throws IOException {
-    return client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute();
+  public HttpBody readFhirResource(String resourceName) throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .read(resourceName)
+        .execute();
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
new file mode 100644
index 0000000..835d717
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FhirIOPatchIT {
+
+  public String version;
+
+  @Parameters(name = "{0}")
+  public static Collection<String> versions() {
+    return Arrays.asList("R4");
+  }
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  private final String project;
+  private transient HealthcareApiClient client;
+  private static String healthcareDataset;
+  private String fhirStoreId;
+  private String resourceName;
+  private static final String BASE_STORE_ID =
+      "FHIR_store_patch_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32));
+
+  public FhirIOPatchIT(String version) {
+    this.version = version;
+    this.fhirStoreId = BASE_STORE_ID + version;
+    this.project =
+        TestPipeline.testingPipelineOptions()
+            .as(HealthcareStoreTestPipelineOptions.class)
+            .getStoreProjectId();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+    if (client == null) {
+      this.client = new HttpHealthcareApiClient();
+    }
+    client.createFhirStore(healthcareDataset, fhirStoreId, version, "");
+
+    resourceName = healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient/123";
+    String bundle =
+        "{\"resourceType\":\"Bundle\","
+            + "\"type\":\"transaction\","
+            + "\"entry\": [{"
+            + "\"request\":{\"method\":\"PUT\",\"url\":\"Patient/123\"},"
+            + "\"resource\":{\"resourceType\":\"Patient\",\"id\":\"123\",\"birthDate\": \"1990-01-01\"}"
+            + "}]}";
+    FhirIOTestUtil.executeFhirBundles(
+        client, healthcareDataset + "/fhirStores/" + fhirStoreId, ImmutableList.of(bundle));
+  }
+
+  @After
+  public void teardown() throws IOException {
+    HealthcareApiClient client = new HttpHealthcareApiClient();
+    for (String version : versions()) {
+      client.deleteFhirStore(healthcareDataset + "/fhirStores/" + BASE_STORE_ID + version);
+    }
+  }
+
+  @Test
+  public void testFhirIOPatch() throws IOException {
+    pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+    Input patchParameter =
+        Input.builder()
+            .setResourceName(resourceName)
+            .setPatch(
+                "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-05-23\"}]")
+            .build();
+    String expectedSuccessBody = patchParameter.toString();
+
+    // Execute patch.
+    PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
+    FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
+
+    // Validate beam results.
+    PAssert.that(result.getFailedBodies()).empty();
+    PCollection<String> successfulBodies = result.getSuccessfulBodies();
+    PAssert.that(successfulBodies)
+        .satisfies(
+            input -> {
+              for (String body : input) {
+                Assert.assertEquals(expectedSuccessBody, body);
+              }
+              return null;
+            });
+
+    pipeline.run().waitUntilFinish();
+
+    // Validate FHIR store contents.
+    HttpBody readResult = client.readFhirResource(resourceName);
+    Assert.assertEquals("1997-05-23", readResult.get("birthDate"));
+  }
+
+  @Test
+  public void testFhirIOPatch_ifMatch() throws IOException {
+    pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+    Input patchParameter =
+        Input.builder()
+            .setResourceName(healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient")
+            .setPatch(
+                "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-06-23\"}]")
+            .setQuery(ImmutableMap.of("birthDate", "1990-01-01"))
+            .build();
+    String expectedSuccessBody = patchParameter.toString();
+
+    // Execute patch.
+    PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
+    FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
+
+    // Validate beam results.
+    PAssert.that(result.getFailedBodies()).empty();
+    PCollection<String> successfulBodies = result.getSuccessfulBodies();
+    PAssert.that(successfulBodies)
+        .satisfies(
+            input -> {
+              for (String body : input) {
+                Assert.assertEquals(expectedSuccessBody, body);
+              }
+              return null;
+            });
+
+    pipeline.run().waitUntilFinish();
+
+    // Validate FHIR store contents.
+    HttpBody readResult = client.readFhirResource(resourceName);
+    Assert.assertEquals("1997-06-23", readResult.get("birthDate"));
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
index 8313a01..eecd371 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -31,6 +32,7 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -133,6 +135,27 @@
     pipeline.run();
   }
 
+  @Test
+  public void test_FhirIO_failedPatch() {
+    Input badPatch = Input.builder().setPatch("").setResourceName("").build();
+    PCollection<Input> patches = pipeline.apply(Create.of(ImmutableList.of(badPatch)));
+    FhirIO.Write.Result patchResult = patches.apply(FhirIO.patchResources());
+
+    PCollection<HealthcareIOError<String>> failedInserts = patchResult.getFailedBodies();
+
+    PAssert.thatSingleton(failedInserts)
+        .satisfies(
+            (HealthcareIOError<String> err) -> {
+              Assert.assertEquals(badPatch.toString(), err.getDataResource());
+              return null;
+            });
+    PCollection<Long> numFailedInserts = failedInserts.apply(Count.globally());
+
+    PAssert.thatSingleton(numFailedInserts).isEqualTo(1L);
+
+    pipeline.run();
+  }
+
   private static final long NUM_ELEMENTS = 11;
 
   private static ArrayList<KV<String, String>> createTestData() {