Merge pull request #14691 from Add PatchResources to FhirIO.
* Add PatchResources to FhirIO.
References:
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch
https://cloud.google.com/healthcare/docs/how-tos/fhir-resources#patching_a_fhir_resource
https://cloud.google.com/healthcare/docs/how-tos/fhir-resources#conditionally_patching_a_fhir_resource
The Google Cloud FHIR service doesn't support neither Patch nor
Conditional Patch within bundles, so it requires it's own connector.
* Run spotless
* Fix @Nullable tags
* Fix guava import
* Prototype AutoValue implementation
* Final AutoValue implementation
* Add GroupIntoBatches to PatchResources.
* Private constructor
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index 5bc73ce..7fd44ce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -28,6 +28,7 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
@@ -58,6 +59,7 @@
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
@@ -65,6 +67,8 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
@@ -101,19 +105,19 @@
* <h3>Reading</h3>
*
* <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a
- * ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from
+ * ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}. This is appropriate for reading the Fhir notifications from
* a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
* prepared list of messages that you need to process (e.g. in a text file read with {@link
* org.apache.beam.sdk.io.TextIO}*) .
*
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of FHIR resource name strings
* {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a
- * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
+ * {@link PCollection} containing the successfully fetched json resources as {@link String}s and/or {@link
* FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
- * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
+ * HealthcareIOError}* containing the resources that could not be fetched and the exception as a
* {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
* choosing. This error handling is mainly to transparently surface errors where the upstream {@link
- * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues.
+ * PCollection}* contains FHIR resources that are not valid or are not reachable due to permissions issues.
*
* <h3>Writing</h3>
*
@@ -383,6 +387,16 @@
}
/**
+ * Patch FHIR resources, @see <a
+ * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch></a>.
+ *
+ * @return the patch
+ */
+ public static PatchResources patchResources() {
+ return new PatchResources();
+ }
+
+ /**
* Increments success and failure counters for an LRO. To be used after the LRO has completed.
* This function leverages the fact that the LRO metadata is always of the format: "counter": {
* "success": "1", "failure": "1" }
@@ -1330,6 +1344,7 @@
/** The type Execute bundles. */
public static class ExecuteBundles extends PTransform<PCollection<String>, Write.Result> {
+
private final ValueProvider<String> fhirStore;
/**
@@ -1417,8 +1432,109 @@
}
}
+ /** The type Patch resources. */
+ public static class PatchResources extends PTransform<PCollection<Input>, Write.Result> {
+
+ private PatchResources() {}
+
+ /** Represents the input parameters for a single FHIR patch request. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ abstract static class Input implements Serializable {
+ abstract String getResourceName();
+
+ abstract String getPatch();
+
+ abstract @Nullable Map<String, String> getQuery();
+
+ static Builder builder() {
+ return new AutoValue_FhirIO_PatchResources_Input.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setResourceName(String resourceName);
+
+ abstract Builder setPatch(String patch);
+
+ abstract Builder setQuery(Map<String, String> query);
+
+ abstract Input build();
+ }
+ }
+
+ @Override
+ public FhirIO.Write.Result expand(PCollection<Input> input) {
+ int numShards = 10;
+ int batchSize = 10000;
+ PCollectionTuple bodies =
+ // Shard input into batches to improve worker performance.
+ input
+ .apply(
+ "Shard input",
+ WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, numShards)))
+ .setCoder(KvCoder.of(TextualIntegerCoder.of(), input.getCoder()))
+ .apply("Assemble batches", GroupIntoBatches.ofSize(batchSize))
+ .setCoder(KvCoder.of(TextualIntegerCoder.of(), IterableCoder.of(input.getCoder())))
+ .apply(
+ ParDo.of(new PatchResourcesFn())
+ .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY)));
+ bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
+ bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+ return Write.Result.in(input.getPipeline(), bodies);
+ }
+
+ /** The type Write Fhir fn. */
+ static class PatchResourcesFn extends DoFn<KV<Integer, Iterable<Input>>, String> {
+
+ private static final Counter PATCH_RESOURCES_ERRORS =
+ Metrics.counter(
+ PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_error_count");
+ private static final Counter PATCH_RESOURCES_SUCCESS =
+ Metrics.counter(
+ PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_success_count");
+ private static final Distribution PATCH_RESOURCES_LATENCY_MS =
+ Metrics.distribution(
+ PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_latency_ms");
+
+ private transient HealthcareApiClient client;
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ * Initialize healthcare client.
+ *
+ * @throws IOException If the Healthcare client cannot be created.
+ */
+ @Setup
+ public void initClient() throws IOException {
+ this.client = new HttpHealthcareApiClient();
+ }
+
+ @ProcessElement
+ public void patchResources(ProcessContext context) {
+ Iterable<Input> batch = context.element().getValue();
+ for (Input patchParameter : batch) {
+ try {
+ long startTime = Instant.now().toEpochMilli();
+ client.patchFhirResource(
+ patchParameter.getResourceName(),
+ patchParameter.getPatch(),
+ patchParameter.getQuery());
+ PATCH_RESOURCES_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime);
+ PATCH_RESOURCES_SUCCESS.inc();
+ context.output(Write.SUCCESSFUL_BODY, patchParameter.toString());
+ } catch (IOException | HealthcareHttpException e) {
+ PATCH_RESOURCES_ERRORS.inc();
+ context.output(Write.FAILED_BODY, HealthcareIOError.of(patchParameter.toString(), e));
+ }
+ }
+ }
+ }
+ }
+
/** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */
public static class Export extends PTransform<PBegin, PCollection<String>> {
+
private final ValueProvider<String> fhirStore;
private final ValueProvider<String> exportGcsUriPrefix;
@@ -1481,6 +1597,7 @@
/** Deidentify FHIR resources from a FHIR store to a destination FHIR store. */
public static class Deidentify extends PTransform<PBegin, PCollection<String>> {
+
private final ValueProvider<String> sourceFhirStore;
private final ValueProvider<String> destinationFhirStore;
private final ValueProvider<DeidentifyConfig> deidConfig;
@@ -1551,6 +1668,7 @@
/** The type Search. */
public static class Search<T>
extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> {
+
private static final Logger LOG = LoggerFactory.getLogger(Search.class);
private final ValueProvider<String> fhirStore;
@@ -1564,6 +1682,7 @@
}
public static class Result implements POutput, PInput {
+
private PCollection<KV<String, JsonArray>> keyedResources;
private PCollection<JsonArray> resources;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
index 6f80d86..db34d33 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
@@ -46,11 +46,26 @@
this.queries = queries;
}
+ /**
+ * Creates a FhirSearchParameter to represent a FHIR Search request.
+ *
+ * @param resourceType resource type for search, leave empty for all
+ * @param key optional key to index searches by
+ * @param queries search query, with field as key and search as value
+ * @return FhirSearchParameter
+ */
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable String key, @Nullable Map<String, T> queries) {
return new FhirSearchParameter<>(resourceType, key, queries);
}
+ /**
+ * Creates a FhirSearchParameter to represent a FHIR Search request.
+ *
+ * @param resourceType resource type for search, leave empty for all
+ * @param queries search query, with field as key and search as value
+ * @return FhirSearchParameter
+ */
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable Map<String, T> queries) {
return new FhirSearchParameter<>(resourceType, null, queries);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
index f4e38c4..81b2a75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
@@ -162,13 +162,27 @@
throws IOException, HealthcareHttpException;
/**
+ * Patch fhir resource http body.
+ *
+ * @param resourceName the resource name, in format
+ * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}[/{id}], id not
+ * present when queryString is specified.
+ * @param patch the patch operation
+ * @param query optional query for conditional patches
+ * @return the http body
+ */
+ HttpBody patchFhirResource(String resourceName, String patch, @Nullable Map<String, String> query)
+ throws IOException, HealthcareHttpException;
+
+ /**
* Read fhir resource http body.
*
- * @param resourceId the resource
+ * @param resourceName the resource name, in format
+ * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
* @return the http body
* @throws IOException the io exception
*/
- HttpBody readFhirResource(String resourceId) throws IOException;
+ HttpBody readFhirResource(String resourceName) throws IOException;
/**
* Search fhir resource http body.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index 283d011..929fe2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -100,6 +100,7 @@
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable {
+
private static final String USER_AGENT =
String.format(
"apache-beam-io-google-cloud-platform-healthcare/%s",
@@ -107,6 +108,7 @@
private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
+ private static final String FHIRSTORE_PATCH_CONTENT_TYPE = "application/json-patch+json";
private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
private transient CloudHealthcare client;
private transient HttpClient httpClient;
@@ -594,11 +596,61 @@
return responseModel;
}
+ @Override
+ public HttpBody patchFhirResource(
+ String resourceName, String patch, @Nullable Map<String, String> query)
+ throws IOException, HealthcareHttpException {
+ if (httpClient == null || client == null) {
+ initClient();
+ }
+
+ credentials.refreshIfExpired();
+ StringEntity requestEntity = new StringEntity(patch, ContentType.APPLICATION_JSON);
+ URI uri;
+ try {
+ URIBuilder uriBuilder = new URIBuilder(client.getRootUrl() + "v1beta1/" + resourceName);
+ if (query != null) {
+ for (Map.Entry<String, String> q : query.entrySet()) {
+ uriBuilder.addParameter(q.getKey(), q.getValue());
+ }
+ }
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ LOG.error("URL error when making patch request to FHIR API. " + e.getMessage());
+ throw new IllegalArgumentException(e);
+ }
+
+ RequestBuilder requestBuilder =
+ RequestBuilder.patch()
+ .setUri(uri)
+ .setEntity(requestEntity)
+ .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue())
+ .addHeader("User-Agent", USER_AGENT)
+ .addHeader("Content-Type", FHIRSTORE_PATCH_CONTENT_TYPE)
+ .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT)
+ .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET);
+
+ HttpUriRequest request = requestBuilder.build();
+ HttpResponse response = httpClient.execute(request);
+ HttpEntity responseEntity = response.getEntity();
+ String content = EntityUtils.toString(responseEntity);
+
+ // Check 2XX code.
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (!(statusCode / 100 == 2)) {
+ throw HealthcareHttpException.of(statusCode, content);
+ }
+ HttpBody responseModel = new HttpBody();
+ responseModel.setData(content);
+ return responseModel;
+ }
+
/**
* Wraps {@link HttpResponse} in an exception with a statusCode field for use with {@link
* HealthcareIOError}.
*/
public static class HealthcareHttpException extends Exception {
+
private final int statusCode;
private HealthcareHttpException(int statusCode, String message) {
@@ -630,8 +682,15 @@
}
@Override
- public HttpBody readFhirResource(String resourceId) throws IOException {
- return client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute();
+ public HttpBody readFhirResource(String resourceName) throws IOException {
+ return client
+ .projects()
+ .locations()
+ .datasets()
+ .fhirStores()
+ .fhir()
+ .read(resourceName)
+ .execute();
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
new file mode 100644
index 0000000..835d717
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FhirIOPatchIT {
+
+ public String version;
+
+ @Parameters(name = "{0}")
+ public static Collection<String> versions() {
+ return Arrays.asList("R4");
+ }
+
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ private final String project;
+ private transient HealthcareApiClient client;
+ private static String healthcareDataset;
+ private String fhirStoreId;
+ private String resourceName;
+ private static final String BASE_STORE_ID =
+ "FHIR_store_patch_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32));
+
+ public FhirIOPatchIT(String version) {
+ this.version = version;
+ this.fhirStoreId = BASE_STORE_ID + version;
+ this.project =
+ TestPipeline.testingPipelineOptions()
+ .as(HealthcareStoreTestPipelineOptions.class)
+ .getStoreProjectId();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+ if (client == null) {
+ this.client = new HttpHealthcareApiClient();
+ }
+ client.createFhirStore(healthcareDataset, fhirStoreId, version, "");
+
+ resourceName = healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient/123";
+ String bundle =
+ "{\"resourceType\":\"Bundle\","
+ + "\"type\":\"transaction\","
+ + "\"entry\": [{"
+ + "\"request\":{\"method\":\"PUT\",\"url\":\"Patient/123\"},"
+ + "\"resource\":{\"resourceType\":\"Patient\",\"id\":\"123\",\"birthDate\": \"1990-01-01\"}"
+ + "}]}";
+ FhirIOTestUtil.executeFhirBundles(
+ client, healthcareDataset + "/fhirStores/" + fhirStoreId, ImmutableList.of(bundle));
+ }
+
+ @After
+ public void teardown() throws IOException {
+ HealthcareApiClient client = new HttpHealthcareApiClient();
+ for (String version : versions()) {
+ client.deleteFhirStore(healthcareDataset + "/fhirStores/" + BASE_STORE_ID + version);
+ }
+ }
+
+ @Test
+ public void testFhirIOPatch() throws IOException {
+ pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+ Input patchParameter =
+ Input.builder()
+ .setResourceName(resourceName)
+ .setPatch(
+ "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-05-23\"}]")
+ .build();
+ String expectedSuccessBody = patchParameter.toString();
+
+ // Execute patch.
+ PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
+ FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
+
+ // Validate beam results.
+ PAssert.that(result.getFailedBodies()).empty();
+ PCollection<String> successfulBodies = result.getSuccessfulBodies();
+ PAssert.that(successfulBodies)
+ .satisfies(
+ input -> {
+ for (String body : input) {
+ Assert.assertEquals(expectedSuccessBody, body);
+ }
+ return null;
+ });
+
+ pipeline.run().waitUntilFinish();
+
+ // Validate FHIR store contents.
+ HttpBody readResult = client.readFhirResource(resourceName);
+ Assert.assertEquals("1997-05-23", readResult.get("birthDate"));
+ }
+
+ @Test
+ public void testFhirIOPatch_ifMatch() throws IOException {
+ pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+ Input patchParameter =
+ Input.builder()
+ .setResourceName(healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient")
+ .setPatch(
+ "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-06-23\"}]")
+ .setQuery(ImmutableMap.of("birthDate", "1990-01-01"))
+ .build();
+ String expectedSuccessBody = patchParameter.toString();
+
+ // Execute patch.
+ PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
+ FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
+
+ // Validate beam results.
+ PAssert.that(result.getFailedBodies()).empty();
+ PCollection<String> successfulBodies = result.getSuccessfulBodies();
+ PAssert.that(successfulBodies)
+ .satisfies(
+ input -> {
+ for (String body : input) {
+ Assert.assertEquals(expectedSuccessBody, body);
+ }
+ return null;
+ });
+
+ pipeline.run().waitUntilFinish();
+
+ // Validate FHIR store contents.
+ HttpBody readResult = client.readFhirResource(resourceName);
+ Assert.assertEquals("1997-06-23", readResult.get("birthDate"));
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
index 8313a01..eecd371 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -31,6 +32,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -133,6 +135,27 @@
pipeline.run();
}
+ @Test
+ public void test_FhirIO_failedPatch() {
+ Input badPatch = Input.builder().setPatch("").setResourceName("").build();
+ PCollection<Input> patches = pipeline.apply(Create.of(ImmutableList.of(badPatch)));
+ FhirIO.Write.Result patchResult = patches.apply(FhirIO.patchResources());
+
+ PCollection<HealthcareIOError<String>> failedInserts = patchResult.getFailedBodies();
+
+ PAssert.thatSingleton(failedInserts)
+ .satisfies(
+ (HealthcareIOError<String> err) -> {
+ Assert.assertEquals(badPatch.toString(), err.getDataResource());
+ return null;
+ });
+ PCollection<Long> numFailedInserts = failedInserts.apply(Count.globally());
+
+ PAssert.thatSingleton(numFailedInserts).isEqualTo(1L);
+
+ pipeline.run();
+ }
+
private static final long NUM_ELEMENTS = 11;
private static ArrayList<KV<String, String>> createTestData() {