Revert "Merge pull request #14691 from Add PatchResources to FhirIO."
This reverts commit e37bedea
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index 7fd44ce..5bc73ce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -28,7 +28,6 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
@@ -59,7 +58,6 @@
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
-import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
@@ -67,8 +65,6 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
@@ -105,19 +101,19 @@
* <h3>Reading</h3>
*
* <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a
- * ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}. This is appropriate for reading the Fhir notifications from
+ * ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from
* a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
* prepared list of messages that you need to process (e.g. in a text file read with {@link
* org.apache.beam.sdk.io.TextIO}*) .
*
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of FHIR resource name strings
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
* {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a
- * {@link PCollection} containing the successfully fetched json resources as {@link String}s and/or {@link
+ * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
* FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
- * HealthcareIOError}* containing the resources that could not be fetched and the exception as a
+ * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
* {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
* choosing. This error handling is mainly to transparently surface errors where the upstream {@link
- * PCollection}* contains FHIR resources that are not valid or are not reachable due to permissions issues.
+ * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues.
*
* <h3>Writing</h3>
*
@@ -387,16 +383,6 @@
}
/**
- * Patch FHIR resources, @see <a
- * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch></a>.
- *
- * @return the patch
- */
- public static PatchResources patchResources() {
- return new PatchResources();
- }
-
- /**
* Increments success and failure counters for an LRO. To be used after the LRO has completed.
* This function leverages the fact that the LRO metadata is always of the format: "counter": {
* "success": "1", "failure": "1" }
@@ -1344,7 +1330,6 @@
/** The type Execute bundles. */
public static class ExecuteBundles extends PTransform<PCollection<String>, Write.Result> {
-
private final ValueProvider<String> fhirStore;
/**
@@ -1432,109 +1417,8 @@
}
}
- /** The type Patch resources. */
- public static class PatchResources extends PTransform<PCollection<Input>, Write.Result> {
-
- private PatchResources() {}
-
- /** Represents the input parameters for a single FHIR patch request. */
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- abstract static class Input implements Serializable {
- abstract String getResourceName();
-
- abstract String getPatch();
-
- abstract @Nullable Map<String, String> getQuery();
-
- static Builder builder() {
- return new AutoValue_FhirIO_PatchResources_Input.Builder();
- }
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setResourceName(String resourceName);
-
- abstract Builder setPatch(String patch);
-
- abstract Builder setQuery(Map<String, String> query);
-
- abstract Input build();
- }
- }
-
- @Override
- public FhirIO.Write.Result expand(PCollection<Input> input) {
- int numShards = 10;
- int batchSize = 10000;
- PCollectionTuple bodies =
- // Shard input into batches to improve worker performance.
- input
- .apply(
- "Shard input",
- WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, numShards)))
- .setCoder(KvCoder.of(TextualIntegerCoder.of(), input.getCoder()))
- .apply("Assemble batches", GroupIntoBatches.ofSize(batchSize))
- .setCoder(KvCoder.of(TextualIntegerCoder.of(), IterableCoder.of(input.getCoder())))
- .apply(
- ParDo.of(new PatchResourcesFn())
- .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY)));
- bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
- bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
- return Write.Result.in(input.getPipeline(), bodies);
- }
-
- /** The type Write Fhir fn. */
- static class PatchResourcesFn extends DoFn<KV<Integer, Iterable<Input>>, String> {
-
- private static final Counter PATCH_RESOURCES_ERRORS =
- Metrics.counter(
- PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_error_count");
- private static final Counter PATCH_RESOURCES_SUCCESS =
- Metrics.counter(
- PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_success_count");
- private static final Distribution PATCH_RESOURCES_LATENCY_MS =
- Metrics.distribution(
- PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_latency_ms");
-
- private transient HealthcareApiClient client;
- private final ObjectMapper mapper = new ObjectMapper();
-
- /**
- * Initialize healthcare client.
- *
- * @throws IOException If the Healthcare client cannot be created.
- */
- @Setup
- public void initClient() throws IOException {
- this.client = new HttpHealthcareApiClient();
- }
-
- @ProcessElement
- public void patchResources(ProcessContext context) {
- Iterable<Input> batch = context.element().getValue();
- for (Input patchParameter : batch) {
- try {
- long startTime = Instant.now().toEpochMilli();
- client.patchFhirResource(
- patchParameter.getResourceName(),
- patchParameter.getPatch(),
- patchParameter.getQuery());
- PATCH_RESOURCES_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime);
- PATCH_RESOURCES_SUCCESS.inc();
- context.output(Write.SUCCESSFUL_BODY, patchParameter.toString());
- } catch (IOException | HealthcareHttpException e) {
- PATCH_RESOURCES_ERRORS.inc();
- context.output(Write.FAILED_BODY, HealthcareIOError.of(patchParameter.toString(), e));
- }
- }
- }
- }
- }
-
/** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */
public static class Export extends PTransform<PBegin, PCollection<String>> {
-
private final ValueProvider<String> fhirStore;
private final ValueProvider<String> exportGcsUriPrefix;
@@ -1597,7 +1481,6 @@
/** Deidentify FHIR resources from a FHIR store to a destination FHIR store. */
public static class Deidentify extends PTransform<PBegin, PCollection<String>> {
-
private final ValueProvider<String> sourceFhirStore;
private final ValueProvider<String> destinationFhirStore;
private final ValueProvider<DeidentifyConfig> deidConfig;
@@ -1668,7 +1551,6 @@
/** The type Search. */
public static class Search<T>
extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> {
-
private static final Logger LOG = LoggerFactory.getLogger(Search.class);
private final ValueProvider<String> fhirStore;
@@ -1682,7 +1564,6 @@
}
public static class Result implements POutput, PInput {
-
private PCollection<KV<String, JsonArray>> keyedResources;
private PCollection<JsonArray> resources;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
index db34d33..6f80d86 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
@@ -46,26 +46,11 @@
this.queries = queries;
}
- /**
- * Creates a FhirSearchParameter to represent a FHIR Search request.
- *
- * @param resourceType resource type for search, leave empty for all
- * @param key optional key to index searches by
- * @param queries search query, with field as key and search as value
- * @return FhirSearchParameter
- */
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable String key, @Nullable Map<String, T> queries) {
return new FhirSearchParameter<>(resourceType, key, queries);
}
- /**
- * Creates a FhirSearchParameter to represent a FHIR Search request.
- *
- * @param resourceType resource type for search, leave empty for all
- * @param queries search query, with field as key and search as value
- * @return FhirSearchParameter
- */
public static <T> FhirSearchParameter<T> of(
String resourceType, @Nullable Map<String, T> queries) {
return new FhirSearchParameter<>(resourceType, null, queries);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
index 81b2a75..f4e38c4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
@@ -162,27 +162,13 @@
throws IOException, HealthcareHttpException;
/**
- * Patch fhir resource http body.
- *
- * @param resourceName the resource name, in format
- * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}[/{id}], id not
- * present when queryString is specified.
- * @param patch the patch operation
- * @param query optional query for conditional patches
- * @return the http body
- */
- HttpBody patchFhirResource(String resourceName, String patch, @Nullable Map<String, String> query)
- throws IOException, HealthcareHttpException;
-
- /**
* Read fhir resource http body.
*
- * @param resourceName the resource name, in format
- * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
+ * @param resourceId the resource
* @return the http body
* @throws IOException the io exception
*/
- HttpBody readFhirResource(String resourceName) throws IOException;
+ HttpBody readFhirResource(String resourceId) throws IOException;
/**
* Search fhir resource http body.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index 929fe2a..283d011 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -100,7 +100,6 @@
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable {
-
private static final String USER_AGENT =
String.format(
"apache-beam-io-google-cloud-platform-healthcare/%s",
@@ -108,7 +107,6 @@
private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
- private static final String FHIRSTORE_PATCH_CONTENT_TYPE = "application/json-patch+json";
private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
private transient CloudHealthcare client;
private transient HttpClient httpClient;
@@ -596,61 +594,11 @@
return responseModel;
}
- @Override
- public HttpBody patchFhirResource(
- String resourceName, String patch, @Nullable Map<String, String> query)
- throws IOException, HealthcareHttpException {
- if (httpClient == null || client == null) {
- initClient();
- }
-
- credentials.refreshIfExpired();
- StringEntity requestEntity = new StringEntity(patch, ContentType.APPLICATION_JSON);
- URI uri;
- try {
- URIBuilder uriBuilder = new URIBuilder(client.getRootUrl() + "v1beta1/" + resourceName);
- if (query != null) {
- for (Map.Entry<String, String> q : query.entrySet()) {
- uriBuilder.addParameter(q.getKey(), q.getValue());
- }
- }
- uri = uriBuilder.build();
- } catch (URISyntaxException e) {
- LOG.error("URL error when making patch request to FHIR API. " + e.getMessage());
- throw new IllegalArgumentException(e);
- }
-
- RequestBuilder requestBuilder =
- RequestBuilder.patch()
- .setUri(uri)
- .setEntity(requestEntity)
- .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue())
- .addHeader("User-Agent", USER_AGENT)
- .addHeader("Content-Type", FHIRSTORE_PATCH_CONTENT_TYPE)
- .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT)
- .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET);
-
- HttpUriRequest request = requestBuilder.build();
- HttpResponse response = httpClient.execute(request);
- HttpEntity responseEntity = response.getEntity();
- String content = EntityUtils.toString(responseEntity);
-
- // Check 2XX code.
- int statusCode = response.getStatusLine().getStatusCode();
- if (!(statusCode / 100 == 2)) {
- throw HealthcareHttpException.of(statusCode, content);
- }
- HttpBody responseModel = new HttpBody();
- responseModel.setData(content);
- return responseModel;
- }
-
/**
* Wraps {@link HttpResponse} in an exception with a statusCode field for use with {@link
* HealthcareIOError}.
*/
public static class HealthcareHttpException extends Exception {
-
private final int statusCode;
private HealthcareHttpException(int statusCode, String message) {
@@ -682,15 +630,8 @@
}
@Override
- public HttpBody readFhirResource(String resourceName) throws IOException {
- return client
- .projects()
- .locations()
- .datasets()
- .fhirStores()
- .fhir()
- .read(resourceName)
- .execute();
+ public HttpBody readFhirResource(String resourceId) throws IOException {
+ return client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute();
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
deleted file mode 100644
index 835d717..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.healthcare;
-
-import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
-
-import com.google.api.services.healthcare.v1beta1.model.HttpBody;
-import java.io.IOException;
-import java.security.SecureRandom;
-import java.util.Arrays;
-import java.util.Collection;
-import org.apache.beam.runners.direct.DirectOptions;
-import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class FhirIOPatchIT {
-
- public String version;
-
- @Parameters(name = "{0}")
- public static Collection<String> versions() {
- return Arrays.asList("R4");
- }
-
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- private final String project;
- private transient HealthcareApiClient client;
- private static String healthcareDataset;
- private String fhirStoreId;
- private String resourceName;
- private static final String BASE_STORE_ID =
- "FHIR_store_patch_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32));
-
- public FhirIOPatchIT(String version) {
- this.version = version;
- this.fhirStoreId = BASE_STORE_ID + version;
- this.project =
- TestPipeline.testingPipelineOptions()
- .as(HealthcareStoreTestPipelineOptions.class)
- .getStoreProjectId();
- }
-
- @Before
- public void setup() throws Exception {
- healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
- if (client == null) {
- this.client = new HttpHealthcareApiClient();
- }
- client.createFhirStore(healthcareDataset, fhirStoreId, version, "");
-
- resourceName = healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient/123";
- String bundle =
- "{\"resourceType\":\"Bundle\","
- + "\"type\":\"transaction\","
- + "\"entry\": [{"
- + "\"request\":{\"method\":\"PUT\",\"url\":\"Patient/123\"},"
- + "\"resource\":{\"resourceType\":\"Patient\",\"id\":\"123\",\"birthDate\": \"1990-01-01\"}"
- + "}]}";
- FhirIOTestUtil.executeFhirBundles(
- client, healthcareDataset + "/fhirStores/" + fhirStoreId, ImmutableList.of(bundle));
- }
-
- @After
- public void teardown() throws IOException {
- HealthcareApiClient client = new HttpHealthcareApiClient();
- for (String version : versions()) {
- client.deleteFhirStore(healthcareDataset + "/fhirStores/" + BASE_STORE_ID + version);
- }
- }
-
- @Test
- public void testFhirIOPatch() throws IOException {
- pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
-
- Input patchParameter =
- Input.builder()
- .setResourceName(resourceName)
- .setPatch(
- "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-05-23\"}]")
- .build();
- String expectedSuccessBody = patchParameter.toString();
-
- // Execute patch.
- PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
- FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
-
- // Validate beam results.
- PAssert.that(result.getFailedBodies()).empty();
- PCollection<String> successfulBodies = result.getSuccessfulBodies();
- PAssert.that(successfulBodies)
- .satisfies(
- input -> {
- for (String body : input) {
- Assert.assertEquals(expectedSuccessBody, body);
- }
- return null;
- });
-
- pipeline.run().waitUntilFinish();
-
- // Validate FHIR store contents.
- HttpBody readResult = client.readFhirResource(resourceName);
- Assert.assertEquals("1997-05-23", readResult.get("birthDate"));
- }
-
- @Test
- public void testFhirIOPatch_ifMatch() throws IOException {
- pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
-
- Input patchParameter =
- Input.builder()
- .setResourceName(healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient")
- .setPatch(
- "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-06-23\"}]")
- .setQuery(ImmutableMap.of("birthDate", "1990-01-01"))
- .build();
- String expectedSuccessBody = patchParameter.toString();
-
- // Execute patch.
- PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
- FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
-
- // Validate beam results.
- PAssert.that(result.getFailedBodies()).empty();
- PCollection<String> successfulBodies = result.getSuccessfulBodies();
- PAssert.that(successfulBodies)
- .satisfies(
- input -> {
- for (String body : input) {
- Assert.assertEquals(expectedSuccessBody, body);
- }
- return null;
- });
-
- pipeline.run().waitUntilFinish();
-
- // Validate FHIR store contents.
- HttpBody readResult = client.readFhirResource(resourceName);
- Assert.assertEquals("1997-06-23", readResult.get("birthDate"));
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
index eecd371..8313a01 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
@@ -23,7 +23,6 @@
import java.util.List;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -32,7 +31,6 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -135,27 +133,6 @@
pipeline.run();
}
- @Test
- public void test_FhirIO_failedPatch() {
- Input badPatch = Input.builder().setPatch("").setResourceName("").build();
- PCollection<Input> patches = pipeline.apply(Create.of(ImmutableList.of(badPatch)));
- FhirIO.Write.Result patchResult = patches.apply(FhirIO.patchResources());
-
- PCollection<HealthcareIOError<String>> failedInserts = patchResult.getFailedBodies();
-
- PAssert.thatSingleton(failedInserts)
- .satisfies(
- (HealthcareIOError<String> err) -> {
- Assert.assertEquals(badPatch.toString(), err.getDataResource());
- return null;
- });
- PCollection<Long> numFailedInserts = failedInserts.apply(Count.globally());
-
- PAssert.thatSingleton(numFailedInserts).isEqualTo(1L);
-
- pipeline.run();
- }
-
private static final long NUM_ELEMENTS = 11;
private static ArrayList<KV<String, String>> createTestData() {