| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.healthcare; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.api.services.healthcare.v1beta1.model.DeidentifyConfig; |
| import com.google.api.services.healthcare.v1beta1.model.HttpBody; |
| import com.google.api.services.healthcare.v1beta1.model.Operation; |
| import com.google.auto.value.AutoValue; |
| import com.google.gson.Gson; |
| import com.google.gson.JsonArray; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.WritableByteChannel; |
| import java.nio.charset.StandardCharsets; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import java.util.UUID; |
| import java.util.concurrent.ThreadLocalRandom; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.coders.IterableCoder; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.coders.TextualIntegerCoder; |
| import org.apache.beam.sdk.coders.VoidCoder; |
| import org.apache.beam.sdk.io.FileIO; |
| import org.apache.beam.sdk.io.FileSystems; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; |
| import org.apache.beam.sdk.io.fs.MatchResult; |
| import org.apache.beam.sdk.io.fs.MatchResult.Metadata; |
| import org.apache.beam.sdk.io.fs.MatchResult.Status; |
| import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; |
| import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; |
| import org.apache.beam.sdk.io.fs.ResourceId; |
| import org.apache.beam.sdk.io.fs.ResourceIdCoder; |
| import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException; |
| import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; |
| import org.apache.beam.sdk.metrics.Counter; |
| import org.apache.beam.sdk.metrics.Distribution; |
| import org.apache.beam.sdk.metrics.Metrics; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.GroupIntoBatches; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.Wait; |
| import org.apache.beam.sdk.transforms.WithKeys; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollection.IsBounded; |
| import org.apache.beam.sdk.values.PCollectionTuple; |
| import org.apache.beam.sdk.values.PInput; |
| import org.apache.beam.sdk.values.POutput; |
| import org.apache.beam.sdk.values.PValue; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.TupleTagList; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; |
| import org.checkerframework.checker.nullness.qual.Nullable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * {@link FhirIO} provides an API for reading and writing resources to <a |
| * href="https://cloud.google.com/healthcare/docs/concepts/fhir">Google Cloud Healthcare Fhir API. |
| * </a> |
| * |
| * <h3>Reading</h3> |
| * |
| * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a |
| * ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from |
| * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually |
| * prepared list of messages that you need to process (e.g. in a text file read with {@link |
| * org.apache.beam.sdk.io.TextIO}*) . |
| * |
| * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings |
| * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a |
| * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link |
| * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link |
| * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a |
| * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your |
| * choosing. This error handling is mainly to transparently surface errors where the upstream {@link |
| * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues. |
| * |
| * <h3>Writing</h3> |
| * |
| * <p>Write Resources can be written to FHIR with two different methods: Import or Execute Bundle. |
| * |
| * <p>Execute Bundle This is best for use cases where you are writing to a non-empty FHIR store with |
| * other clients or otherwise need referential integrity (e.g. A Streaming HL7v2 to FHIR ETL |
| * pipeline). |
| * |
| * <p>Import This is best for use cases where you are populating an empty FHIR store with no other |
| * clients. It is faster than the execute bundles method but does not respect referential integrity |
| * and the resources are not written transactionally (e.g. a historicaly backfill on a new FHIR |
| * store) This requires each resource to contain a client provided ID. It is important that when |
| * using import you give the appropriate permissions to the Google Cloud Healthcare Service Agent. |
| * |
| * <p>Export This is to export FHIR resources from a FHIR store to Google Cloud Storage. The output |
| * resources are in ndjson (newline delimited json) of FHIR resources. It is important that when |
| * using export you give the appropriate permissions to the Google Cloud Healthcare Service Agent. |
| * |
| * <p>Deidentify This is to de-identify FHIR resources from a source FHIR store and write the result |
| * to a destination FHIR store. It is important that the destination store must already exist. |
| * |
| * <p>Search This is to search FHIR resources within a given FHIR store. The inputs are individual |
| * FHIR Search queries, represented by the FhirSearchParameter class. The outputs are results of |
| * each Search, represented as a Json array of FHIR resources in string form, with pagination |
| * handled, and an optional input key. |
| * |
| * @see <a |
| * href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a> |
| * @see <a |
| * href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a> |
| * @see <a |
| * href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a> |
| * @see <a |
| * href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/export></a> |
| * @see <a |
| * href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/deidentify></a> |
| * @see <a |
| * href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/search></a> |
| * A {@link PCollection} of {@link String} can be ingested into an Fhir store using {@link |
| * FhirIO.Write#fhirStoresImport(String, String, String, FhirIO.Import.ContentStructure)} This |
| * will return a {@link FhirIO.Write.Result} on which you can call {@link |
| * FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link PCollection} of {@link |
| * HealthcareIOError} containing the {@link String} that failed to be ingested and the |
| * exception. |
| * <p>Example |
| * <pre>{@code |
| * Pipeline pipeline = ... |
| * |
| * // Tail the FHIR store by retrieving resources based on Pub/Sub notifications. |
| * FhirIO.Read.Result readResult = p |
| * .apply("Read FHIR notifications", |
| * PubsubIO.readStrings().fromSubscription(options.getNotificationSubscription())) |
| * .apply(FhirIO.readResources()); |
| * |
| * // happily retrived messages |
| * PCollection<String> resources = readResult.getResources(); |
| * // message IDs that couldn't be retrieved + error context |
| * PCollection<HealthcareIOError<String>> failedReads = readResult.getFailedReads(); |
| * |
| * failedReads.apply("Write Message IDs / Stacktrace for Failed Reads to BigQuery", |
| * BigQueryIO |
| * .write() |
| * .to(option.getBQFhirExecuteBundlesDeadLetterTable()) |
| * .withFormatFunction(new HealthcareIOErrorToTableRow())); |
| * |
| * output = resources.apply("Happy path transformations", ...); |
| * FhirIO.Write.Result writeResult = |
| * output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore())); |
| * |
| * PCollection<HealthcareIOError<String>> failedBundles = writeResult.getFailedInsertsWithErr(); |
| * |
| * failedBundles.apply("Write failed bundles to BigQuery", |
| * BigQueryIO |
| * .write() |
| * .to(option.getBQFhirExecuteBundlesDeadLetterTable()) |
| * .withFormatFunction(new HealthcareIOErrorToTableRow())); |
| * |
| * // Alternatively you could use import for high throughput to a new store. |
| * FhirIO.Write.Result writeResult = |
| * output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore())); |
| * |
| * // Export FHIR resources to Google Cloud Storage. |
| * String fhirStoreName = ...; |
| * String exportGcsUriPrefix = ...; |
| * PCollection<String> resources = |
| * pipeline.apply(FhirIO.exportResourcesToGcs(fhirStoreName, exportGcsUriPrefix)); |
| * |
| * // De-identify FHIR resources. |
| * String sourceFhirStoreName = ...; |
| * String destinationFhirStoreName = ...; |
| * DeidentifyConfig deidConfig = new DeidentifyConfig(); // use default DeidentifyConfig |
| * pipeline.apply(FhirIO.deidentify(fhirStoreName, destinationFhirStoreName, deidConfig)); |
| * |
| * // Search FHIR resources using an "OR" query. |
| * Map<String, String> queries = new HashMap<>(); |
| * queries.put("name", "Alice,Bob"); |
| * FhirSearchParameter<String> searchParameter = FhirSearchParameter.of("Patient", queries); |
| * PCollection<FhirSearchParameter<String>> searchQueries = |
| * pipeline.apply( |
| * Create.of(searchParameter) |
| * .withCoder(FhirSearchParameterCoder.of(StringUtf8Coder.of()))); |
| * FhirIO.Search.Result searchResult = |
| * searchQueries.apply(FhirIO.searchResources(options.getFhirStore())); |
| * PCollection<JsonArray> resources = searchResult.getResources(); // JsonArray of results |
| * |
| * // Search FHIR resources using an "AND" query with a key. |
| * Map<String, List<String>> listQueries = new HashMap<>(); |
| * listQueries.put("name", Arrays.asList("Alice", "Bob")); |
| * FhirSearchParameter<List<String>> listSearchParameter = |
| * FhirSearchParameter.of("Patient", "Alice-Bob-Search", listQueries); |
| * PCollection<FhirSearchParameter<List<String>>> listSearchQueries = |
| * pipeline.apply( |
| * Create.of(listSearchParameter) |
| * .withCoder(FhirSearchParameterCoder.of(ListCoder.of(StringUtf8Coder.of())))); |
| * FhirIO.Search.Result listSearchResult = |
| * searchQueries.apply(FhirIO.searchResources(options.getFhirStore())); |
| * PCollection<KV<String, JsonArray>> listResource = |
| * listSearchResult.getKeyedResources(); // KV<"Alice-Bob-Search", JsonArray of results> |
| * |
| * </pre> |
| */ |
| @SuppressWarnings({ |
| "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) |
| }) |
| public class FhirIO { |
| private static final String BASE_METRIC_PREFIX = "fhirio/"; |
| private static final String LRO_COUNTER_KEY = "counter"; |
| private static final String LRO_SUCCESS_KEY = "success"; |
| private static final String LRO_FAILURE_KEY = "failure"; |
| private static final Logger LOG = LoggerFactory.getLogger(FhirIO.class); |
| |
| /** |
| * Read resources from a PCollection of resource IDs (e.g. when subscribing the pubsub |
| * notifications) |
| * |
| * @return the read |
| * @see Read |
| */ |
| public static Read readResources() { |
| return new Read(); |
| } |
| |
| /** |
| * Search resources from a Fhir store with String parameter values. |
| * |
| * @return the search |
| * @see Search |
| */ |
| public static Search<String> searchResources(String fhirStore) { |
| return new Search<String>(fhirStore); |
| } |
| |
| /** |
| * Search resources from a Fhir store with any type of parameter values. |
| * |
| * @return the search |
| * @see Search |
| */ |
| public static Search<? extends Object> searchResourcesWithGenericParameters(String fhirStore) { |
| return new Search<>(fhirStore); |
| } |
| |
| /** |
| * Import resources. Intended for use on empty FHIR stores |
| * |
| * @param fhirStore the fhir store |
| * @param tempDir the temp dir |
| * @param deadLetterDir the dead letter dir |
| * @param contentStructure the content structure |
| * @return the import |
| * @see Import |
| */ |
| public static Import importResources( |
| String fhirStore, |
| String tempDir, |
| String deadLetterDir, |
| FhirIO.Import.@Nullable ContentStructure contentStructure) { |
| return new Import(fhirStore, tempDir, deadLetterDir, contentStructure); |
| } |
| |
| /** |
| * Import resources. Intended for use on empty FHIR stores |
| * |
| * @param fhirStore the fhir store |
| * @param tempDir the temp dir |
| * @param deadLetterDir the dead letter dir |
| * @param contentStructure the content structure |
| * @return the import |
| * @see Import |
| */ |
| public static Import importResources( |
| ValueProvider<String> fhirStore, |
| ValueProvider<String> tempDir, |
| ValueProvider<String> deadLetterDir, |
| FhirIO.Import.@Nullable ContentStructure contentStructure) { |
| return new Import(fhirStore, tempDir, deadLetterDir, contentStructure); |
| } |
| |
| /** |
| * Export resources to GCS. Intended for use on non-empty FHIR stores |
| * |
| * @param fhirStore the fhir store, in the format: |
| * projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_id |
| * @param exportGcsUriPrefix the destination GCS dir, in the format: |
| * gs://YOUR_BUCKET_NAME/path/to/a/dir |
| * @return the export |
| * @see Export |
| */ |
| public static Export exportResourcesToGcs(String fhirStore, String exportGcsUriPrefix) { |
| return new Export( |
| StaticValueProvider.of(fhirStore), StaticValueProvider.of(exportGcsUriPrefix)); |
| } |
| |
| /** |
| * Export resources to GCS. Intended for use on non-empty FHIR stores |
| * |
| * @param fhirStore the fhir store, in the format: |
| * projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_id |
| * @param exportGcsUriPrefix the destination GCS dir, in the format: |
| * gs://YOUR_BUCKET_NAME/path/to/a/dir |
| * @return the export |
| * @see Export |
| */ |
| public static Export exportResourcesToGcs( |
| ValueProvider<String> fhirStore, ValueProvider<String> exportGcsUriPrefix) { |
| return new Export(fhirStore, exportGcsUriPrefix); |
| } |
| |
| /** |
| * Deidentify FHIR resources. Intended for use on non-empty FHIR stores |
| * |
| * @param sourceFhirStore the source fhir store, in the format: |
| * projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_id |
| * @param destinationFhirStore the destination fhir store to write de-identified resources, in the |
| * format: |
| * projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_id |
| * @param deidConfig the DeidentifyConfig |
| * @return the deidentify |
| * @see Deidentify |
| */ |
| public static Deidentify deidentify( |
| String sourceFhirStore, String destinationFhirStore, DeidentifyConfig deidConfig) { |
| return new Deidentify( |
| StaticValueProvider.of(sourceFhirStore), |
| StaticValueProvider.of(destinationFhirStore), |
| StaticValueProvider.of(deidConfig)); |
| } |
| |
| /** |
| * Deidentify FHIR resources. Intended for use on non-empty FHIR stores |
| * |
| * @param sourceFhirStore the source fhir store, in the format: |
| * projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_id |
| * @param destinationFhirStore the destination fhir store to write de-identified resources, in the |
| * format: |
| * projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_id |
| * @param deidConfig the DeidentifyConfig |
| * @return the deidentify |
| * @see Deidentify |
| */ |
| public static Deidentify deidentify( |
| ValueProvider<String> sourceFhirStore, |
| ValueProvider<String> destinationFhirStore, |
| ValueProvider<DeidentifyConfig> deidConfig) { |
| return new Deidentify(sourceFhirStore, destinationFhirStore, deidConfig); |
| } |
| |
| /** |
| * Increments success and failure counters for an LRO. To be used after the LRO has completed. |
| * This function leverages the fact that the LRO metadata is always of the format: "counter": { |
| * "success": "1", "failure": "1" } |
| * |
| * @param operation LRO operation object. |
| * @param successCounter the success counter for this operation. |
| * @param failureCounter the failure counter for this operation. |
| */ |
| private static void incrementLroCounters( |
| Operation operation, Counter successCounter, Counter failureCounter) { |
| Map<String, Object> opMetadata = operation.getMetadata(); |
| if (opMetadata.containsKey(LRO_COUNTER_KEY)) { |
| try { |
| Map<String, String> counters = (Map<String, String>) opMetadata.get(LRO_COUNTER_KEY); |
| if (counters.containsKey(LRO_SUCCESS_KEY)) { |
| successCounter.inc(Long.parseLong(counters.get(LRO_SUCCESS_KEY))); |
| } |
| if (counters.containsKey(LRO_FAILURE_KEY)) { |
| Long numFailures = Long.parseLong(counters.get(LRO_FAILURE_KEY)); |
| failureCounter.inc(numFailures); |
| if (numFailures > 0) { |
| LOG.error("LRO: " + operation.getName() + " had " + numFailures + " errors."); |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("failed to increment LRO counters, error message: " + e.getMessage()); |
| } |
| } |
| } |
| |
| /** The type Read. */ |
| public static class Read extends PTransform<PCollection<String>, FhirIO.Read.Result> { |
| private static final Logger LOG = LoggerFactory.getLogger(Read.class); |
| |
| /** Instantiates a new Read. */ |
| public Read() {} |
| |
| /** The type Result. */ |
| public static class Result implements POutput, PInput { |
| private PCollection<String> resources; |
| |
| private PCollection<HealthcareIOError<String>> failedReads; |
| /** The Pct. */ |
| PCollectionTuple pct; |
| |
| /** |
| * Create FhirIO.Read.Result form PCollectionTuple with OUT and DEAD_LETTER tags. |
| * |
| * @param pct the pct |
| * @return the read result |
| * @throws IllegalArgumentException the illegal argument exception |
| */ |
| static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException { |
| if (pct.has(OUT) && pct.has(DEAD_LETTER)) { |
| return new FhirIO.Read.Result(pct); |
| } else { |
| throw new IllegalArgumentException( |
| "The PCollection tuple must have the FhirIO.Read.OUT " |
| + "and FhirIO.Read.DEAD_LETTER tuple tags"); |
| } |
| } |
| |
| private Result(PCollectionTuple pct) { |
| this.pct = pct; |
| this.resources = pct.get(OUT); |
| this.failedReads = |
| pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); |
| } |
| |
| /** |
| * Gets failed reads. |
| * |
| * @return the failed reads |
| */ |
| public PCollection<HealthcareIOError<String>> getFailedReads() { |
| return failedReads; |
| } |
| |
| /** |
| * Gets resources. |
| * |
| * @return the resources |
| */ |
| public PCollection<String> getResources() { |
| return resources; |
| } |
| |
| @Override |
| public Pipeline getPipeline() { |
| return this.pct.getPipeline(); |
| } |
| |
| @Override |
| public Map<TupleTag<?>, PValue> expand() { |
| return ImmutableMap.of(OUT, resources); |
| } |
| |
| @Override |
| public void finishSpecifyingOutput( |
| String transformName, PInput input, PTransform<?, ?> transform) {} |
| } |
| |
| /** The tag for the main output of Fhir Messages. */ |
| public static final TupleTag<String> OUT = new TupleTag<String>() {}; |
| /** The tag for the deadletter output of Fhir Messages. */ |
| public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER = |
| new TupleTag<HealthcareIOError<String>>() {}; |
| |
| @Override |
| public FhirIO.Read.Result expand(PCollection<String> input) { |
| return input.apply("Fetch Fhir messages", new FetchResourceJsonString()); |
| } |
| |
| /** |
| * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store based on resourceID |
| * |
| * <p>This DoFn consumes a {@link PCollection} of notifications {@link String}s from the FHIR |
| * store, and fetches the actual {@link String} object based on the id in the notification and |
| * will output a {@link PCollectionTuple} which contains the output and dead-letter {@link |
| * PCollection}*. |
| * |
| * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}: |
| * |
| * <ul> |
| * <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection} records successfully read |
| * from the Fhir store. |
| * <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link PCollection} of {@link |
| * HealthcareIOError}* of message IDs which failed to be fetched from the Fhir store, with |
| * error message and stacktrace. |
| * </ul> |
| */ |
| static class FetchResourceJsonString |
| extends PTransform<PCollection<String>, FhirIO.Read.Result> { |
| |
| /** Instantiates a new Fetch Fhir message DoFn. */ |
| public FetchResourceJsonString() {} |
| |
| @Override |
| public FhirIO.Read.Result expand(PCollection<String> resourceIds) { |
| return new FhirIO.Read.Result( |
| resourceIds.apply( |
| ParDo.of(new ReadResourceFn()) |
| .withOutputTags(FhirIO.Read.OUT, TupleTagList.of(FhirIO.Read.DEAD_LETTER)))); |
| } |
| |
| /** DoFn for fetching messages from the Fhir store with error handling. */ |
| static class ReadResourceFn extends DoFn<String, String> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ReadResourceFn.class); |
| private static final Counter READ_RESOURCE_ERRORS = |
| Metrics.counter(ReadResourceFn.class, BASE_METRIC_PREFIX + "read_resource_error_count"); |
| private static final Counter READ_RESOURCE_SUCCESS = |
| Metrics.counter( |
| ReadResourceFn.class, BASE_METRIC_PREFIX + "read_resource_success_count"); |
| private static final Distribution READ_RESOURCE_LATENCY_MS = |
| Metrics.distribution( |
| ReadResourceFn.class, BASE_METRIC_PREFIX + "read_resource_latency_ms"); |
| |
| private HealthcareApiClient client; |
| private ObjectMapper mapper; |
| |
| /** Instantiates a new Hl 7 v 2 message get fn. */ |
| ReadResourceFn() {} |
| |
| /** |
| * Instantiate healthcare client. |
| * |
| * @throws IOException the io exception |
| */ |
| @Setup |
| public void instantiateHealthcareClient() throws IOException { |
| this.client = new HttpHealthcareApiClient(); |
| this.mapper = new ObjectMapper(); |
| } |
| |
| /** |
| * Process element. |
| * |
| * @param context the context |
| */ |
| @ProcessElement |
| public void processElement(ProcessContext context) { |
| String resourceId = context.element(); |
| try { |
| context.output(fetchResource(this.client, resourceId)); |
| } catch (Exception e) { |
| READ_RESOURCE_ERRORS.inc(); |
| LOG.warn( |
| String.format( |
| "Error fetching Fhir message with ID %s writing to Dead Letter " |
| + "Queue. Cause: %s Stack Trace: %s", |
| resourceId, e.getMessage(), Throwables.getStackTraceAsString(e))); |
| context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e)); |
| } |
| } |
| |
| private String fetchResource(HealthcareApiClient client, String resourceId) |
| throws IOException, IllegalArgumentException { |
| long startTime = Instant.now().toEpochMilli(); |
| |
| HttpBody resource = client.readFhirResource(resourceId); |
| READ_RESOURCE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); |
| |
| if (resource == null) { |
| throw new IOException(String.format("GET request for %s returned null", resourceId)); |
| } |
| READ_RESOURCE_SUCCESS.inc(); |
| return mapper.writeValueAsString(resource); |
| } |
| } |
| } |
| } |
| |
| /** The type Write. */ |
| @AutoValue |
| public abstract static class Write extends PTransform<PCollection<String>, Write.Result> { |
| |
| /** The tag for successful writes to FHIR store. */ |
| public static final TupleTag<String> SUCCESSFUL_BODY = new TupleTag<String>() {}; |
| /** The tag for the failed writes to FHIR store. */ |
| public static final TupleTag<HealthcareIOError<String>> FAILED_BODY = |
| new TupleTag<HealthcareIOError<String>>() {}; |
| /** The tag for the files that failed to FHIR store. */ |
| public static final TupleTag<HealthcareIOError<String>> FAILED_FILES = |
| new TupleTag<HealthcareIOError<String>>() {}; |
| /** The tag for temp files for import to FHIR store. */ |
| public static final TupleTag<ResourceId> TEMP_FILES = new TupleTag<ResourceId>() {}; |
| |
| /** The enum Write method. */ |
| public enum WriteMethod { |
| /** |
| * Execute Bundle Method executes a batch of requests as a single transaction @see <a |
| * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>. |
| */ |
| EXECUTE_BUNDLE, |
| /** |
| * Import Method bulk imports resources from GCS. This is ideal for initial loads to empty |
| * FHIR stores. <a |
| * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>. |
| */ |
| IMPORT |
| } |
| |
| /** The type Result. */ |
| public static class Result implements POutput { |
| private final Pipeline pipeline; |
| private final PCollection<String> successfulBodies; |
| private final PCollection<HealthcareIOError<String>> failedBodies; |
| private final PCollection<HealthcareIOError<String>> failedFiles; |
| |
| /** |
| * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. |
| * |
| * @param pipeline the pipeline |
| * @param bodies the successful and failing bodies results. |
| * @return the result |
| */ |
| static Result in(Pipeline pipeline, PCollectionTuple bodies) throws IllegalArgumentException { |
| if (bodies.has(SUCCESSFUL_BODY) && bodies.has(FAILED_BODY)) { |
| return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null); |
| } else { |
| throw new IllegalArgumentException( |
| "The PCollection tuple bodies must have the FhirIO.Write.SUCCESSFUL_BODY " |
| + "and FhirIO.Write.FAILED_BODY tuple tags."); |
| } |
| } |
| |
| static Result in( |
| Pipeline pipeline, |
| PCollection<HealthcareIOError<String>> failedBodies, |
| PCollection<HealthcareIOError<String>> failedFiles) { |
| return new Result(pipeline, null, failedBodies, failedFiles); |
| } |
| |
| /** |
| * Gets successful bodies from Write. |
| * |
| * @return the entries that were inserted |
| */ |
| public PCollection<String> getSuccessfulBodies() { |
| return this.successfulBodies; |
| } |
| |
| /** |
| * Gets failed bodies with err. |
| * |
| * @return the failed inserts with err |
| */ |
| public PCollection<HealthcareIOError<String>> getFailedBodies() { |
| return this.failedBodies; |
| } |
| |
| /** |
| * Gets failed file imports with err. |
| * |
| * @return the failed GCS uri with err |
| */ |
| public PCollection<HealthcareIOError<String>> getFailedFiles() { |
| return this.failedFiles; |
| } |
| |
| @Override |
| public Pipeline getPipeline() { |
| return this.pipeline; |
| } |
| |
| @Override |
| public Map<TupleTag<?>, PValue> expand() { |
| return ImmutableMap.of( |
| SUCCESSFUL_BODY, |
| successfulBodies, |
| FAILED_BODY, |
| failedBodies, |
| Write.FAILED_FILES, |
| failedFiles); |
| } |
| |
| @Override |
| public void finishSpecifyingOutput( |
| String transformName, PInput input, PTransform<?, ?> transform) {} |
| |
| private Result( |
| Pipeline pipeline, |
| @Nullable PCollection<String> successfulBodies, |
| PCollection<HealthcareIOError<String>> failedBodies, |
| @Nullable PCollection<HealthcareIOError<String>> failedFiles) { |
| this.pipeline = pipeline; |
| if (successfulBodies == null) { |
| successfulBodies = |
| (PCollection<String>) pipeline.apply(Create.empty(StringUtf8Coder.of())); |
| } |
| this.successfulBodies = successfulBodies; |
| this.failedBodies = failedBodies; |
| if (failedFiles == null) { |
| failedFiles = |
| (PCollection<HealthcareIOError<String>>) |
| pipeline.apply(Create.empty(HealthcareIOErrorCoder.of(StringUtf8Coder.of()))); |
| } |
| this.failedFiles = failedFiles; |
| } |
| } |
| |
| /** |
| * Gets Fhir store. |
| * |
| * @return the Fhir store |
| */ |
| abstract ValueProvider<String> getFhirStore(); |
| |
| /** |
| * Gets write method. |
| * |
| * @return the write method |
| */ |
| abstract WriteMethod getWriteMethod(); |
| |
| /** |
| * Gets content structure. |
| * |
| * @return the content structure |
| */ |
| abstract Optional<FhirIO.Import.ContentStructure> getContentStructure(); |
| |
| /** |
| * Gets import gcs temp path. |
| * |
| * @return the import gcs temp path |
| */ |
| abstract Optional<ValueProvider<String>> getImportGcsTempPath(); |
| |
| /** |
| * Gets import gcs dead letter path. |
| * |
| * @return the import gcs dead letter path |
| */ |
| abstract Optional<ValueProvider<String>> getImportGcsDeadLetterPath(); |
| |
| /** The type Builder. */ |
| @AutoValue.Builder |
| abstract static class Builder { |
| |
| /** |
| * Sets Fhir store. |
| * |
| * @param fhirStore the Fhir store |
| * @return the Fhir store |
| */ |
| abstract Builder setFhirStore(ValueProvider<String> fhirStore); |
| |
| /** |
| * Sets write method. |
| * |
| * @param writeMethod the write method |
| * @return the write method |
| */ |
| abstract Builder setWriteMethod(WriteMethod writeMethod); |
| |
| /** |
| * Sets content structure. |
| * |
| * @param contentStructure the content structure |
| * @return the content structure |
| */ |
| abstract Builder setContentStructure(FhirIO.Import.ContentStructure contentStructure); |
| |
| /** |
| * Sets import gcs temp path. |
| * |
| * @param gcsTempPath the gcs temp path |
| * @return the import gcs temp path |
| */ |
| abstract Builder setImportGcsTempPath(ValueProvider<String> gcsTempPath); |
| |
| /** |
| * Sets import gcs dead letter path. |
| * |
| * @param gcsDeadLetterPath the gcs dead letter path |
| * @return the import gcs dead letter path |
| */ |
| abstract Builder setImportGcsDeadLetterPath(ValueProvider<String> gcsDeadLetterPath); |
| |
| /** |
| * Build write. |
| * |
| * @return the write |
| */ |
| abstract Write build(); |
| } |
| |
| private static Write.Builder write(String fhirStore) { |
| return new AutoValue_FhirIO_Write.Builder().setFhirStore(StaticValueProvider.of(fhirStore)); |
| } |
| |
| /** |
| * Create Method creates a single FHIR resource. @see <a |
| * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create></a> |
| * |
| * @param fhirStore the hl 7 v 2 store |
| * @param gcsTempPath the gcs temp path |
| * @param gcsDeadLetterPath the gcs dead letter path |
| * @param contentStructure the content structure |
| * @return the write |
| */ |
| public static Write fhirStoresImport( |
| String fhirStore, |
| String gcsTempPath, |
| String gcsDeadLetterPath, |
| FhirIO.Import.@Nullable ContentStructure contentStructure) { |
| return new AutoValue_FhirIO_Write.Builder() |
| .setFhirStore(StaticValueProvider.of(fhirStore)) |
| .setWriteMethod(Write.WriteMethod.IMPORT) |
| .setContentStructure(contentStructure) |
| .setImportGcsTempPath(StaticValueProvider.of(gcsTempPath)) |
| .setImportGcsDeadLetterPath(StaticValueProvider.of(gcsDeadLetterPath)) |
| .build(); |
| } |
| |
| public static Write fhirStoresImport( |
| String fhirStore, |
| String gcsDeadLetterPath, |
| FhirIO.Import.@Nullable ContentStructure contentStructure) { |
| return new AutoValue_FhirIO_Write.Builder() |
| .setFhirStore(StaticValueProvider.of(fhirStore)) |
| .setWriteMethod(Write.WriteMethod.IMPORT) |
| .setContentStructure(contentStructure) |
| .setImportGcsDeadLetterPath(StaticValueProvider.of(gcsDeadLetterPath)) |
| .build(); |
| } |
| |
| public static Write fhirStoresImport( |
| ValueProvider<String> fhirStore, |
| ValueProvider<String> gcsTempPath, |
| ValueProvider<String> gcsDeadLetterPath, |
| FhirIO.Import.@Nullable ContentStructure contentStructure) { |
| return new AutoValue_FhirIO_Write.Builder() |
| .setFhirStore(fhirStore) |
| .setWriteMethod(Write.WriteMethod.IMPORT) |
| .setContentStructure(contentStructure) |
| .setImportGcsTempPath(gcsTempPath) |
| .setImportGcsDeadLetterPath(gcsDeadLetterPath) |
| .build(); |
| } |
| |
| /** |
| * Execute Bundle Method executes a batch of requests as a single transaction @see <a |
| * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>. |
| * |
| * @param fhirStore the fhir store |
| * @return the write |
| */ |
| public static Write executeBundles(String fhirStore) { |
| return new AutoValue_FhirIO_Write.Builder() |
| .setFhirStore(StaticValueProvider.of(fhirStore)) |
| .setWriteMethod(WriteMethod.EXECUTE_BUNDLE) |
| .build(); |
| } |
| |
| /** |
| * Execute bundles write. |
| * |
| * @param fhirStore the fhir store |
| * @return the write |
| */ |
| public static Write executeBundles(ValueProvider<String> fhirStore) { |
| return new AutoValue_FhirIO_Write.Builder() |
| .setFhirStore(fhirStore) |
| .setWriteMethod(WriteMethod.EXECUTE_BUNDLE) |
| .build(); |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(Write.class); |
| |
| @Override |
| public Result expand(PCollection<String> input) { |
| PCollectionTuple bundles; |
| switch (this.getWriteMethod()) { |
| case IMPORT: |
| LOG.warn( |
| "Make sure the Cloud Healthcare Service Agent has permissions when using import:" |
| + " https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions"); |
| ValueProvider<String> deadPath = |
| getImportGcsDeadLetterPath().orElseThrow(IllegalArgumentException::new); |
| FhirIO.Import.ContentStructure contentStructure = |
| getContentStructure().orElseThrow(IllegalArgumentException::new); |
| ValueProvider<String> tempPath = |
| getImportGcsTempPath() |
| .orElse( |
| StaticValueProvider.of(input.getPipeline().getOptions().getTempLocation())); |
| |
| return input.apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure)); |
| case EXECUTE_BUNDLE: |
| default: |
| bundles = |
| input.apply( |
| "Execute FHIR Bundles", |
| ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())) |
| .withOutputTags(SUCCESSFUL_BODY, TupleTagList.of(FAILED_BODY))); |
| bundles.get(SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); |
| bundles.get(FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); |
| } |
| return Result.in(input.getPipeline(), bundles); |
| } |
| } |
| |
| /** |
| * Writes each bundle of elements to a new-line delimited JSON file on GCS and issues a |
| * fhirStores.import Request for that file. This is intended for batch use only to facilitate |
| * large backfills to empty FHIR stores and should not be used with unbounded PCollections. If |
| * your use case is streaming checkout using {@link ExecuteBundles} to more safely execute bundles |
| * as transactions which is safer practice for a use on a "live" FHIR store. |
| */ |
| public static class Import extends Write { |
| |
| private final ValueProvider<String> fhirStore; |
| private final ValueProvider<String> deadLetterGcsPath; |
| private final ContentStructure contentStructure; |
| private static final int DEFAULT_FILES_PER_BATCH = 10000; |
| private static final Logger LOG = LoggerFactory.getLogger(Import.class); |
| private ValueProvider<String> tempGcsPath; |
| |
| /* |
| * Instantiates a new Import. |
| * |
| * @param fhirStore the fhir store |
| * @param tempGcsPath the temp gcs path |
| * @param deadLetterGcsPath the dead letter gcs path |
| * @param contentStructure the content structure |
| */ |
| Import( |
| ValueProvider<String> fhirStore, |
| ValueProvider<String> tempGcsPath, |
| ValueProvider<String> deadLetterGcsPath, |
| @Nullable ContentStructure contentStructure) { |
| this.fhirStore = fhirStore; |
| this.tempGcsPath = tempGcsPath; |
| this.deadLetterGcsPath = deadLetterGcsPath; |
| if (contentStructure == null) { |
| this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED; |
| } else { |
| this.contentStructure = contentStructure; |
| } |
| } |
| |
| Import( |
| ValueProvider<String> fhirStore, |
| ValueProvider<String> deadLetterGcsPath, |
| @Nullable ContentStructure contentStructure) { |
| this(fhirStore, null, deadLetterGcsPath, contentStructure); |
| } |
| |
| /** |
| * Instantiates a new Import. |
| * |
| * @param fhirStore the fhir store |
| * @param tempGcsPath the temp gcs path |
| * @param deadLetterGcsPath the dead letter gcs path |
| * @param contentStructure the content structure |
| */ |
| Import( |
| String fhirStore, |
| String tempGcsPath, |
| String deadLetterGcsPath, |
| @Nullable ContentStructure contentStructure) { |
| this.fhirStore = StaticValueProvider.of(fhirStore); |
| this.tempGcsPath = StaticValueProvider.of(tempGcsPath); |
| this.deadLetterGcsPath = StaticValueProvider.of(deadLetterGcsPath); |
| if (contentStructure == null) { |
| this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED; |
| } else { |
| this.contentStructure = contentStructure; |
| } |
| } |
| |
| @Override |
| ValueProvider<String> getFhirStore() { |
| return fhirStore; |
| } |
| |
| @Override |
| WriteMethod getWriteMethod() { |
| return WriteMethod.IMPORT; |
| } |
| |
| @Override |
| Optional<ContentStructure> getContentStructure() { |
| return Optional.of(contentStructure); |
| } |
| |
| @Override |
| Optional<ValueProvider<String>> getImportGcsTempPath() { |
| return Optional.of(tempGcsPath); |
| } |
| |
| @Override |
| Optional<ValueProvider<String>> getImportGcsDeadLetterPath() { |
| return Optional.of(deadLetterGcsPath); |
| } |
| |
| @Override |
| public Write.Result expand(PCollection<String> input) { |
| checkState( |
| input.isBounded() == IsBounded.BOUNDED, |
| "FhirIO.Import should only be used on bounded PCollections as it is" |
| + "intended for batch use only."); |
| |
| // fall back on pipeline's temp location. |
| ValueProvider<String> tempPath = |
| getImportGcsTempPath() |
| .orElse(StaticValueProvider.of(input.getPipeline().getOptions().getTempLocation())); |
| |
| // Write bundles of String to GCS |
| PCollectionTuple writeTmpFileResults = |
| input.apply( |
| "Write nd json to GCS", |
| ParDo.of(new WriteBundlesToFilesFn(fhirStore, tempPath, deadLetterGcsPath)) |
| .withOutputTags(Write.TEMP_FILES, TupleTagList.of(Write.FAILED_BODY))); |
| |
| PCollection<HealthcareIOError<String>> failedBodies = |
| writeTmpFileResults |
| .get(Write.FAILED_BODY) |
| .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); |
| int numShards = 100; |
| PCollection<HealthcareIOError<String>> failedFiles = |
| writeTmpFileResults |
| .get(Write.TEMP_FILES) |
| .apply( |
| "Shard files", // to paralelize group into batches |
| WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, numShards))) |
| .setCoder(KvCoder.of(TextualIntegerCoder.of(), ResourceIdCoder.of())) |
| .apply("Assemble File Batches", GroupIntoBatches.ofSize(DEFAULT_FILES_PER_BATCH)) |
| .setCoder( |
| KvCoder.of(TextualIntegerCoder.of(), IterableCoder.of(ResourceIdCoder.of()))) |
| .apply( |
| "Import Batches", |
| ParDo.of(new ImportFn(fhirStore, tempPath, deadLetterGcsPath, contentStructure))) |
| .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); |
| |
| input |
| .getPipeline() |
| .apply("Instantiate Temp Path", Create.ofProvider(tempPath, StringUtf8Coder.of())) |
| .apply( |
| "Resolve SubDirs", |
| MapElements.into(TypeDescriptors.strings()) |
| .via((String path) -> path.endsWith("/") ? path + "*" : path + "/*")) |
| .apply("Wait On File Writing", Wait.on(failedBodies)) |
| .apply("Wait On FHIR Importing", Wait.on(failedFiles)) |
| .apply( |
| "Match tempGcsPath", |
| FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)) |
| .apply( |
| "Delete tempGcsPath", |
| ParDo.of( |
| new DoFn<Metadata, Void>() { |
| @ProcessElement |
| public void delete(@Element Metadata path, ProcessContext context) { |
| // Wait til window closes for failedBodies and failedFiles to ensure we are |
| // done processing anything under tempGcsPath because it has been |
| // successfully imported to FHIR store or copies have been moved to the |
| // dead letter path. |
| // Clean up all of tempGcsPath. This will handle removing phantom temporary |
| // objects from failed / rescheduled ImportFn::importBatch. |
| try { |
| FileSystems.delete( |
| Collections.singleton(path.resourceId()), |
| StandardMoveOptions.IGNORE_MISSING_FILES); |
| } catch (IOException e) { |
| LOG.error("error cleaning up tempGcsDir: %s", e); |
| } |
| } |
| })) |
| .setCoder(VoidCoder.of()); |
| |
| return Write.Result.in(input.getPipeline(), failedBodies, failedFiles); |
| } |
| |
| /** The Write bundles to new line delimited json files. */ |
| static class WriteBundlesToFilesFn extends DoFn<String, ResourceId> { |
| |
| private final ValueProvider<String> fhirStore; |
| private final ValueProvider<String> tempGcsPath; |
| private final ValueProvider<String> deadLetterGcsPath; |
| private ObjectMapper mapper; |
| private ResourceId resourceId; |
| private WritableByteChannel ndJsonChannel; |
| private BoundedWindow window; |
| |
| private transient HealthcareApiClient client; |
| private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFilesFn.class); |
| |
| WriteBundlesToFilesFn( |
| ValueProvider<String> fhirStore, |
| ValueProvider<String> tempGcsPath, |
| ValueProvider<String> deadLetterGcsPath) { |
| this.fhirStore = fhirStore; |
| this.tempGcsPath = tempGcsPath; |
| this.deadLetterGcsPath = deadLetterGcsPath; |
| } |
| |
| /** |
| * Instantiates a new Import fn. |
| * |
| * @param fhirStore the fhir store |
| * @param tempGcsPath the temp gcs path |
| * @param deadLetterGcsPath the dead letter gcs path |
| */ |
| WriteBundlesToFilesFn(String fhirStore, String tempGcsPath, String deadLetterGcsPath) { |
| this.fhirStore = StaticValueProvider.of(fhirStore); |
| this.tempGcsPath = StaticValueProvider.of(tempGcsPath); |
| this.deadLetterGcsPath = StaticValueProvider.of(deadLetterGcsPath); |
| } |
| |
| /** |
| * Init client. |
| * |
| * @throws IOException the io exception |
| */ |
| @Setup |
| public void initClient() throws IOException { |
| this.client = new HttpHealthcareApiClient(); |
| } |
| |
| /** |
| * Init batch. |
| * |
| * @throws IOException the io exception |
| */ |
| @StartBundle |
| public void initFile() throws IOException { |
| // Write each bundle to newline delimited JSON file. |
| String filename = String.format("fhirImportBatch-%s.ndjson", UUID.randomUUID().toString()); |
| ResourceId tempDir = FileSystems.matchNewResource(this.tempGcsPath.get(), true); |
| this.resourceId = tempDir.resolve(filename, StandardResolveOptions.RESOLVE_FILE); |
| this.ndJsonChannel = FileSystems.create(resourceId, "application/ld+json"); |
| if (mapper == null) { |
| this.mapper = new ObjectMapper(); |
| } |
| } |
| |
| /** |
| * Add to batch. |
| * |
| * @param context the context |
| * @throws IOException the io exception |
| */ |
| @ProcessElement |
| public void addToFile(ProcessContext context, BoundedWindow window) throws IOException { |
| this.window = window; |
| String httpBody = context.element(); |
| try { |
| // This will error if not valid JSON an convert Pretty JSON to raw JSON. |
| Object data = this.mapper.readValue(httpBody, Object.class); |
| String ndJson = this.mapper.writeValueAsString(data) + "\n"; |
| this.ndJsonChannel.write(ByteBuffer.wrap(ndJson.getBytes(StandardCharsets.UTF_8))); |
| } catch (JsonProcessingException e) { |
| String resource = |
| String.format( |
| "Failed to parse payload: %s as json at: %s : %s." |
| + "Dropping message from batch import.", |
| httpBody.toString(), e.getLocation().getCharOffset(), e.getMessage()); |
| LOG.warn(resource); |
| context.output( |
| Write.FAILED_BODY, HealthcareIOError.of(httpBody, new IOException(resource))); |
| } |
| } |
| |
| /** |
| * Close file. |
| * |
| * @param context the context |
| * @throws IOException the io exception |
| */ |
| @FinishBundle |
| public void closeFile(FinishBundleContext context) throws IOException { |
| // Write the file with all elements in this bundle to GCS. |
| ndJsonChannel.close(); |
| context.output(resourceId, window.maxTimestamp(), window); |
| } |
| } |
| |
| /** Import batches of new line delimited json files to FHIR Store. */ |
| static class ImportFn |
| extends DoFn<KV<Integer, Iterable<ResourceId>>, HealthcareIOError<String>> { |
| |
| private static final Counter IMPORT_ERRORS = |
| Metrics.counter(ImportFn.class, BASE_METRIC_PREFIX + "resources_imported_failure_count"); |
| private static final Counter IMPORT_SUCCESS = |
| Metrics.counter(ImportFn.class, BASE_METRIC_PREFIX + "resources_imported_success_count"); |
| private static final Logger LOG = LoggerFactory.getLogger(ImportFn.class); |
| private final ValueProvider<String> tempGcsPath; |
| private final ValueProvider<String> deadLetterGcsPath; |
| private ResourceId tempDir; |
| private final ContentStructure contentStructure; |
| private HealthcareApiClient client; |
| private final ValueProvider<String> fhirStore; |
| |
| ImportFn( |
| ValueProvider<String> fhirStore, |
| ValueProvider<String> tempGcsPath, |
| ValueProvider<String> deadLetterGcsPath, |
| @Nullable ContentStructure contentStructure) { |
| this.fhirStore = fhirStore; |
| this.tempGcsPath = tempGcsPath; |
| this.deadLetterGcsPath = deadLetterGcsPath; |
| if (contentStructure == null) { |
| this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED; |
| } else { |
| this.contentStructure = contentStructure; |
| } |
| } |
| |
| @Setup |
| public void init() throws IOException { |
| tempDir = |
| FileSystems.matchNewResource(tempGcsPath.get(), true) |
| .resolve( |
| String.format("tmp-%s", UUID.randomUUID().toString()), |
| StandardResolveOptions.RESOLVE_DIRECTORY); |
| client = new HttpHealthcareApiClient(); |
| } |
| |
| /** |
| * Move files to a temporary subdir (to provide common prefix) to execute import with single |
| * GCS URI. |
| */ |
| @ProcessElement |
| public void importBatch( |
| @Element KV<Integer, Iterable<ResourceId>> element, |
| OutputReceiver<HealthcareIOError<String>> output) |
| throws IOException { |
| Iterable<ResourceId> batch = element.getValue(); |
| List<ResourceId> tempDestinations = new ArrayList<>(); |
| List<ResourceId> deadLetterDestinations = new ArrayList<>(); |
| assert batch != null; |
| for (ResourceId file : batch) { |
| tempDestinations.add( |
| tempDir.resolve(file.getFilename(), StandardResolveOptions.RESOLVE_FILE)); |
| deadLetterDestinations.add( |
| FileSystems.matchNewResource(deadLetterGcsPath.get(), true) |
| .resolve(file.getFilename(), StandardResolveOptions.RESOLVE_FILE)); |
| } |
| // Ignore missing files since this might be a retry, which means files |
| // should have been copied over. |
| FileSystems.copy( |
| ImmutableList.copyOf(batch), |
| tempDestinations, |
| StandardMoveOptions.IGNORE_MISSING_FILES); |
| // Check whether any temporary files are not present. |
| boolean hasMissingFile = |
| FileSystems.matchResources(tempDestinations).stream() |
| .anyMatch((MatchResult r) -> r.status() != Status.OK); |
| if (hasMissingFile) { |
| throw new IllegalStateException("Not all temporary files are present for importing."); |
| } |
| ResourceId importUri = tempDir.resolve("*", StandardResolveOptions.RESOLVE_FILE); |
| try { |
| // Blocking fhirStores.import request. |
| assert contentStructure != null; |
| Operation operation = |
| client.importFhirResource( |
| fhirStore.get(), importUri.toString(), contentStructure.name()); |
| operation = client.pollOperation(operation, 500L); |
| incrementLroCounters(operation, IMPORT_SUCCESS, IMPORT_ERRORS); |
| // Clean up temp files on GCS as they we successfully imported to FHIR store and no longer |
| // needed. |
| FileSystems.delete(tempDestinations); |
| } catch (IOException | InterruptedException e) { |
| ResourceId deadLetterResourceId = |
| FileSystems.matchNewResource(deadLetterGcsPath.get(), true); |
| LOG.warn( |
| String.format( |
| "Failed to import %s with error: %s. Moving to deadletter path %s", |
| importUri.toString(), e.getMessage(), deadLetterResourceId.toString())); |
| FileSystems.rename(tempDestinations, deadLetterDestinations); |
| output.output(HealthcareIOError.of(importUri.toString(), e)); |
| } finally { |
| // If we've reached this point files have either been successfully import to FHIR store |
| // or moved to Dead Letter Queue. |
| // Clean up original files for this batch on GCS. |
| FileSystems.delete(ImmutableList.copyOf(batch)); |
| } |
| } |
| } |
| |
| /** The enum Content structure. */ |
| public enum ContentStructure { |
| /** If the content structure is not specified, the default value BUNDLE will be used. */ |
| CONTENT_STRUCTURE_UNSPECIFIED, |
| /** |
| * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is |
| * a bundle, which contains one or more resources. Set the bundle type to history to import |
| * resource versions. |
| */ |
| BUNDLE, |
| /** |
| * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is |
| * a single resource. |
| */ |
| RESOURCE, |
| /** The entire file is one JSON bundle. The JSON can span multiple lines. */ |
| BUNDLE_PRETTY, |
| /** The entire file is one JSON resource. The JSON can span multiple lines. */ |
| RESOURCE_PRETTY |
| } |
| } |
| |
| /** The type Execute bundles. */ |
| public static class ExecuteBundles extends PTransform<PCollection<String>, Write.Result> { |
| private final ValueProvider<String> fhirStore; |
| |
| /** |
| * Instantiates a new Execute bundles. |
| * |
| * @param fhirStore the fhir store |
| */ |
| ExecuteBundles(ValueProvider<String> fhirStore) { |
| this.fhirStore = fhirStore; |
| } |
| |
| /** |
| * Instantiates a new Execute bundles. |
| * |
| * @param fhirStore the fhir store |
| */ |
| ExecuteBundles(String fhirStore) { |
| this.fhirStore = StaticValueProvider.of(fhirStore); |
| } |
| |
| @Override |
| public FhirIO.Write.Result expand(PCollection<String> input) { |
| PCollectionTuple bodies = |
| input.apply( |
| ParDo.of(new ExecuteBundlesFn(fhirStore)) |
| .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY))); |
| bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); |
| bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); |
| return Write.Result.in(input.getPipeline(), bodies); |
| } |
| |
| /** The type Write Fhir fn. */ |
| static class ExecuteBundlesFn extends DoFn<String, String> { |
| |
| private static final Counter EXECUTE_BUNDLE_ERRORS = |
| Metrics.counter( |
| ExecuteBundlesFn.class, BASE_METRIC_PREFIX + "execute_bundle_error_count"); |
| private static final Counter EXECUTE_BUNDLE_SUCCESS = |
| Metrics.counter( |
| ExecuteBundlesFn.class, BASE_METRIC_PREFIX + "execute_bundle_success_count"); |
| private static final Distribution EXECUTE_BUNDLE_LATENCY_MS = |
| Metrics.distribution( |
| ExecuteBundlesFn.class, BASE_METRIC_PREFIX + "execute_bundle_latency_ms"); |
| |
| private transient HealthcareApiClient client; |
| private final ObjectMapper mapper = new ObjectMapper(); |
| /** The Fhir store. */ |
| private final ValueProvider<String> fhirStore; |
| |
| /** |
| * Instantiates a new Write Fhir fn. |
| * |
| * @param fhirStore the Fhir store |
| */ |
| ExecuteBundlesFn(ValueProvider<String> fhirStore) { |
| this.fhirStore = fhirStore; |
| } |
| |
| /** |
| * Initialize healthcare client. |
| * |
| * @throws IOException If the Healthcare client cannot be created. |
| */ |
| @Setup |
| public void initClient() throws IOException { |
| this.client = new HttpHealthcareApiClient(); |
| } |
| |
| @ProcessElement |
| public void executeBundles(ProcessContext context) { |
| String body = context.element(); |
| try { |
| long startTime = Instant.now().toEpochMilli(); |
| // Validate that data was set to valid JSON. |
| mapper.readTree(body); |
| client.executeFhirBundle(fhirStore.get(), body); |
| EXECUTE_BUNDLE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); |
| EXECUTE_BUNDLE_SUCCESS.inc(); |
| context.output(Write.SUCCESSFUL_BODY, body); |
| } catch (IOException | HealthcareHttpException e) { |
| EXECUTE_BUNDLE_ERRORS.inc(); |
| context.output(Write.FAILED_BODY, HealthcareIOError.of(body, e)); |
| } |
| } |
| } |
| } |
| |
| /** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */ |
| public static class Export extends PTransform<PBegin, PCollection<String>> { |
| private final ValueProvider<String> fhirStore; |
| private final ValueProvider<String> exportGcsUriPrefix; |
| |
| public Export(ValueProvider<String> fhirStore, ValueProvider<String> exportGcsUriPrefix) { |
| this.fhirStore = fhirStore; |
| this.exportGcsUriPrefix = exportGcsUriPrefix; |
| } |
| |
| @Override |
| public PCollection<String> expand(PBegin input) { |
| return input |
| .apply(Create.ofProvider(fhirStore, StringUtf8Coder.of())) |
| .apply( |
| "ScheduleExportOperations", |
| ParDo.of(new ExportResourcesToGcsFn(this.exportGcsUriPrefix))) |
| .apply(FileIO.matchAll()) |
| .apply(FileIO.readMatches()) |
| .apply("ReadResourcesFromFiles", TextIO.readFiles()); |
| } |
| |
| /** A function that schedules an export operation and monitors the status. */ |
| public static class ExportResourcesToGcsFn extends DoFn<String, String> { |
| |
| private static final Counter EXPORT_ERRORS = |
| Metrics.counter( |
| ExportResourcesToGcsFn.class, |
| BASE_METRIC_PREFIX + "resources_exported_failure_count"); |
| private static final Counter EXPORT_SUCCESS = |
| Metrics.counter( |
| ExportResourcesToGcsFn.class, |
| BASE_METRIC_PREFIX + "resources_exported_success_count"); |
| private HealthcareApiClient client; |
| private final ValueProvider<String> exportGcsUriPrefix; |
| |
| public ExportResourcesToGcsFn(ValueProvider<String> exportGcsUriPrefix) { |
| this.exportGcsUriPrefix = exportGcsUriPrefix; |
| } |
| |
| @Setup |
| public void initClient() throws IOException { |
| this.client = new HttpHealthcareApiClient(); |
| } |
| |
| @ProcessElement |
| public void exportResourcesToGcs(ProcessContext context) |
| throws IOException, InterruptedException, HealthcareHttpException { |
| String fhirStore = context.element(); |
| String gcsPrefix = this.exportGcsUriPrefix.get(); |
| Operation operation = client.exportFhirResourceToGcs(fhirStore, gcsPrefix); |
| operation = client.pollOperation(operation, 1000L); |
| if (operation.getError() != null) { |
| throw new RuntimeException( |
| String.format("Export operation (%s) failed.", operation.getName())); |
| } |
| incrementLroCounters(operation, EXPORT_SUCCESS, EXPORT_ERRORS); |
| context.output(String.format("%s/*", gcsPrefix.replaceAll("/+$", ""))); |
| } |
| } |
| } |
| |
| /** Deidentify FHIR resources from a FHIR store to a destination FHIR store. */ |
| public static class Deidentify extends PTransform<PBegin, PCollection<String>> { |
| private final ValueProvider<String> sourceFhirStore; |
| private final ValueProvider<String> destinationFhirStore; |
| private final ValueProvider<DeidentifyConfig> deidConfig; |
| |
| public Deidentify( |
| ValueProvider<String> sourceFhirStore, |
| ValueProvider<String> destinationFhirStore, |
| ValueProvider<DeidentifyConfig> deidConfig) { |
| this.sourceFhirStore = sourceFhirStore; |
| this.destinationFhirStore = destinationFhirStore; |
| this.deidConfig = deidConfig; |
| } |
| |
| @Override |
| public PCollection<String> expand(PBegin input) { |
| return input |
| .getPipeline() |
| .apply(Create.ofProvider(sourceFhirStore, StringUtf8Coder.of())) |
| .apply( |
| "ScheduleDeidentifyFhirStoreOperations", |
| ParDo.of(new DeidentifyFn(destinationFhirStore, deidConfig))); |
| } |
| |
| /** A function that schedules a deidentify operation and monitors the status. */ |
| public static class DeidentifyFn extends DoFn<String, String> { |
| |
| private static final Counter DEIDENTIFY_ERRORS = |
| Metrics.counter( |
| DeidentifyFn.class, BASE_METRIC_PREFIX + "resources_deidentified_failure_count"); |
| private static final Counter DEIDENTIFY_SUCCESS = |
| Metrics.counter( |
| DeidentifyFn.class, BASE_METRIC_PREFIX + "resources_deidentified_success_count"); |
| private HealthcareApiClient client; |
| private final ValueProvider<String> destinationFhirStore; |
| private static final Gson gson = new Gson(); |
| private final String deidConfigJson; |
| |
| public DeidentifyFn( |
| ValueProvider<String> destinationFhirStore, ValueProvider<DeidentifyConfig> deidConfig) { |
| this.destinationFhirStore = destinationFhirStore; |
| this.deidConfigJson = gson.toJson(deidConfig.get()); |
| } |
| |
| @Setup |
| public void initClient() throws IOException { |
| this.client = new HttpHealthcareApiClient(); |
| } |
| |
| @ProcessElement |
| public void deidentify(ProcessContext context) |
| throws IOException, InterruptedException, HealthcareHttpException { |
| String sourceFhirStore = context.element(); |
| String destinationFhirStore = this.destinationFhirStore.get(); |
| DeidentifyConfig deidConfig = gson.fromJson(this.deidConfigJson, DeidentifyConfig.class); |
| Operation operation = |
| client.deidentifyFhirStore(sourceFhirStore, destinationFhirStore, deidConfig); |
| operation = client.pollOperation(operation, 1000L); |
| if (operation.getError() != null) { |
| throw new IOException( |
| String.format("DeidentifyFhirStore operation (%s) failed.", operation.getName())); |
| } |
| incrementLroCounters(operation, DEIDENTIFY_SUCCESS, DEIDENTIFY_ERRORS); |
| context.output(destinationFhirStore); |
| } |
| } |
| } |
| |
| /** The type Search. */ |
| public static class Search<T> |
| extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> { |
| private static final Logger LOG = LoggerFactory.getLogger(Search.class); |
| |
| private final ValueProvider<String> fhirStore; |
| |
| Search(ValueProvider<String> fhirStore) { |
| this.fhirStore = fhirStore; |
| } |
| |
| Search(String fhirStore) { |
| this.fhirStore = StaticValueProvider.of(fhirStore); |
| } |
| |
| public static class Result implements POutput, PInput { |
| private PCollection<KV<String, JsonArray>> keyedResources; |
| private PCollection<JsonArray> resources; |
| |
| private PCollection<HealthcareIOError<String>> failedSearches; |
| PCollectionTuple pct; |
| |
| /** |
| * Create FhirIO.Search.Result form PCollectionTuple with OUT and DEAD_LETTER tags. |
| * |
| * @param pct the pct |
| * @return the search result |
| * @throws IllegalArgumentException the illegal argument exception |
| */ |
| static FhirIO.Search.Result of(PCollectionTuple pct) throws IllegalArgumentException { |
| if (pct.has(OUT) && pct.has(DEAD_LETTER)) { |
| return new FhirIO.Search.Result(pct); |
| } else { |
| throw new IllegalArgumentException( |
| "The PCollection tuple must have the FhirIO.Search.OUT " |
| + "and FhirIO.Search.DEAD_LETTER tuple tags"); |
| } |
| } |
| |
| private Result(PCollectionTuple pct) { |
| this.pct = pct; |
| this.keyedResources = |
| pct.get(OUT).setCoder(KvCoder.of(StringUtf8Coder.of(), JsonArrayCoder.of())); |
| this.resources = |
| this.keyedResources |
| .apply( |
| "Extract Values", |
| MapElements.into(TypeDescriptor.of(JsonArray.class)) |
| .via((KV<String, JsonArray> in) -> in.getValue())) |
| .setCoder(JsonArrayCoder.of()); |
| this.failedSearches = |
| pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); |
| } |
| |
| /** |
| * Gets failed searches. |
| * |
| * @return the failed searches |
| */ |
| public PCollection<HealthcareIOError<String>> getFailedSearches() { |
| return failedSearches; |
| } |
| |
| /** |
| * Gets resources. |
| * |
| * @return the resources |
| */ |
| public PCollection<JsonArray> getResources() { |
| return resources; |
| } |
| |
| /** |
| * Gets resources with input SearchParameter key. |
| * |
| * @return the resources with input SearchParameter key. |
| */ |
| public PCollection<KV<String, JsonArray>> getKeyedResources() { |
| return keyedResources; |
| } |
| |
| @Override |
| public Pipeline getPipeline() { |
| return this.pct.getPipeline(); |
| } |
| |
| @Override |
| public Map<TupleTag<?>, PValue> expand() { |
| return ImmutableMap.of(OUT, keyedResources); |
| } |
| |
| @Override |
| public void finishSpecifyingOutput( |
| String transformName, PInput input, PTransform<?, ?> transform) {} |
| } |
| |
| /** The tag for the main output of Fhir Messages. */ |
| public static final TupleTag<KV<String, JsonArray>> OUT = |
| new TupleTag<KV<String, JsonArray>>() {}; |
| /** The tag for the deadletter output of Fhir Messages. */ |
| public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER = |
| new TupleTag<HealthcareIOError<String>>() {}; |
| |
| @Override |
| public FhirIO.Search.Result expand(PCollection<FhirSearchParameter<T>> input) { |
| return input.apply("Fetch Fhir messages", new SearchResourcesJsonString(this.fhirStore)); |
| } |
| |
| /** |
| * DoFn to fetch resources from an Google Cloud Healthcare FHIR store based on search request |
| * |
| * <p>This DoFn consumes a {@link PCollection} of search requests consisting of resource type |
| * and search parameters, and fetches all matching resources based on the search criteria and |
| * will output a {@link PCollectionTuple} which contains the output and dead-letter {@link |
| * PCollection}*. |
| * |
| * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}: |
| * |
| * <ul> |
| * <li>{@link FhirIO.Search#OUT} - Contains all {@link PCollection} records successfully |
| * search from the Fhir store. |
| * <li>{@link FhirIO.Search#DEAD_LETTER} - Contains all {@link PCollection} of {@link |
| * HealthcareIOError}* of failed searches from the Fhir store, with error message and |
| * stacktrace. |
| * </ul> |
| */ |
| class SearchResourcesJsonString |
| extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> { |
| |
| private final ValueProvider<String> fhirStore; |
| |
| public SearchResourcesJsonString(ValueProvider<String> fhirStore) { |
| this.fhirStore = fhirStore; |
| } |
| |
| @Override |
| public FhirIO.Search.Result expand(PCollection<FhirSearchParameter<T>> resourceIds) { |
| return new FhirIO.Search.Result( |
| resourceIds.apply( |
| ParDo.of(new SearchResourcesFn(this.fhirStore)) |
| .withOutputTags( |
| FhirIO.Search.OUT, TupleTagList.of(FhirIO.Search.DEAD_LETTER)))); |
| } |
| |
| /** DoFn for searching messages from the Fhir store with error handling. */ |
| class SearchResourcesFn extends DoFn<FhirSearchParameter<T>, KV<String, JsonArray>> { |
| |
| private final Counter searchResourceErrors = |
| Metrics.counter( |
| SearchResourcesFn.class, BASE_METRIC_PREFIX + "search_resource_error_count"); |
| private final Counter searchResourceSuccess = |
| Metrics.counter( |
| SearchResourcesFn.class, BASE_METRIC_PREFIX + "search_resource_success_count"); |
| private final Distribution searchResourceLatencyMs = |
| Metrics.distribution( |
| SearchResourcesFn.class, BASE_METRIC_PREFIX + "search_resource_latency_ms"); |
| |
| private final Logger log = LoggerFactory.getLogger(SearchResourcesFn.class); |
| private HealthcareApiClient client; |
| private final ValueProvider<String> fhirStore; |
| |
| /** Instantiates a new Fhir resources search fn. */ |
| SearchResourcesFn(ValueProvider<String> fhirStore) { |
| this.fhirStore = fhirStore; |
| } |
| |
| /** |
| * Instantiate healthcare client. |
| * |
| * @throws IOException the io exception |
| */ |
| @Setup |
| public void instantiateHealthcareClient() throws IOException { |
| this.client = new HttpHealthcareApiClient(); |
| } |
| |
| /** |
| * Process element. |
| * |
| * @param context the context |
| */ |
| @ProcessElement |
| public void processElement(ProcessContext context) { |
| FhirSearchParameter<T> fhirSearchParameters = context.element(); |
| try { |
| context.output( |
| KV.of( |
| fhirSearchParameters.getKey(), |
| searchResources( |
| this.client, |
| this.fhirStore.toString(), |
| fhirSearchParameters.getResourceType(), |
| fhirSearchParameters.getQueries()))); |
| } catch (IllegalArgumentException | NoSuchElementException e) { |
| searchResourceErrors.inc(); |
| log.warn( |
| String.format( |
| "Error search FHIR messages writing to Dead Letter " |
| + "Queue. Cause: %s Stack Trace: %s", |
| e.getMessage(), Throwables.getStackTraceAsString(e))); |
| context.output( |
| FhirIO.Search.DEAD_LETTER, HealthcareIOError.of(this.fhirStore.toString(), e)); |
| } |
| } |
| |
| private JsonArray searchResources( |
| HealthcareApiClient client, |
| String fhirStore, |
| String resourceType, |
| @Nullable Map<String, T> parameters) |
| throws NoSuchElementException { |
| long start = Instant.now().toEpochMilli(); |
| |
| HashMap<String, Object> parameterObjects = new HashMap<>(); |
| if (parameters != null) { |
| parameters.forEach(parameterObjects::put); |
| } |
| HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator iter = |
| new HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator( |
| client, fhirStore, resourceType, parameterObjects); |
| JsonArray result = new JsonArray(); |
| while (iter.hasNext()) { |
| result.addAll(iter.next()); |
| } |
| searchResourceLatencyMs.update(java.time.Instant.now().toEpochMilli() - start); |
| searchResourceSuccess.inc(); |
| return result; |
| } |
| } |
| } |
| } |
| } |