blob: b2a26c7a69d8f4847301436df8b01693ae4c5436 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package fhirio provides an API for reading and writing resources to Google
// Cloud Healthcare Fhir stores.
// Experimental.
package fhirio
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"strings"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"google.golang.org/api/googleapi"
"google.golang.org/api/healthcare/v1"
"google.golang.org/api/option"
)
const (
UserAgent = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
baseMetricPrefix = "fhirio/"
errorCounterName = baseMetricPrefix + "resource_error_count"
operationErrorCounterName = baseMetricPrefix + "operation_error_count"
operationSuccessCounterName = baseMetricPrefix + "operation_success_count"
successCounterName = baseMetricPrefix + "resource_success_count"
pageTokenParameterKey = "_page_token"
)
var backoffDuration = [...]time.Duration{time.Second, 5 * time.Second, 10 * time.Second, 15 * time.Second}
func executeAndRecordLatency[T any](ctx context.Context, latencyMs *beam.Distribution, executionSupplier func() (T, error)) (T, error) {
timeBeforeReadRequest := time.Now()
result, err := executionSupplier()
latencyMs.Update(ctx, time.Since(timeBeforeReadRequest).Milliseconds())
return result, err
}
func extractBodyFrom(response *http.Response) (string, error) {
err := googleapi.CheckResponse(response)
if err != nil {
return "", errors.Wrapf(err, "response contains bad status: [%v]", response.Status)
}
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(bodyBytes), nil
}
type operationCounters struct {
successCount, errorCount beam.Counter
}
func (c *operationCounters) setup(namespace string) {
c.successCount = beam.NewCounter(namespace, operationSuccessCounterName)
c.errorCount = beam.NewCounter(namespace, operationErrorCounterName)
}
type operationResults struct {
Successes int64 `json:"success,string"`
Failures int64 `json:"failure,string"`
}
type fhirStoreClient interface {
readResource(resourcePath []byte) (*http.Response, error)
executeBundle(storePath string, bundle string) (*http.Response, error)
search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error)
deidentify(srcStorePath, dstStorePath string, deidConfig *healthcare.DeidentifyConfig) (operationResults, error)
importResources(storePath, gcsURI string, contentStructure ContentStructure) (operationResults, error)
}
type fhirStoreClientImpl struct {
healthcareService *healthcare.Service
}
func newFhirStoreClient() *fhirStoreClientImpl {
healthcareService, err := healthcare.NewService(context.Background(), option.WithUserAgent(UserAgent))
if err != nil {
panic("Failed to initialize Google Cloud Healthcare Service. Reason: " + err.Error())
}
return &fhirStoreClientImpl{healthcareService}
}
func (c *fhirStoreClientImpl) fhirService() *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService {
return c.healthcareService.Projects.Locations.Datasets.FhirStores.Fhir
}
func (c *fhirStoreClientImpl) fhirStoreService() *healthcare.ProjectsLocationsDatasetsFhirStoresService {
return c.healthcareService.Projects.Locations.Datasets.FhirStores
}
func (c *fhirStoreClientImpl) readResource(resourcePath []byte) (*http.Response, error) {
return c.fhirService().Read(string(resourcePath)).Do()
}
func (c *fhirStoreClientImpl) executeBundle(storePath, bundle string) (*http.Response, error) {
return c.fhirService().ExecuteBundle(storePath, strings.NewReader(bundle)).Do()
}
func (c *fhirStoreClientImpl) search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error) {
queryParams := make([]googleapi.CallOption, 0)
for key, value := range queries {
queryParams = append(queryParams, googleapi.QueryParameter(key, value))
}
if pageToken != "" {
queryParams = append(queryParams, googleapi.QueryParameter(pageTokenParameterKey, pageToken))
}
searchRequest := &healthcare.SearchResourcesRequest{}
if resourceType == "" {
return c.fhirService().Search(storePath, searchRequest).Do(queryParams...)
}
return c.fhirService().SearchType(storePath, resourceType, searchRequest).Do(queryParams...)
}
func (c *fhirStoreClientImpl) deidentify(srcStorePath, dstStorePath string, deidConfig *healthcare.DeidentifyConfig) (operationResults, error) {
deidRequest := &healthcare.DeidentifyFhirStoreRequest{
Config: deidConfig,
DestinationStore: dstStorePath,
}
operation, err := c.fhirStoreService().Deidentify(srcStorePath, deidRequest).Do()
if err != nil {
return operationResults{}, err
}
return c.pollTilCompleteAndCollectResults(operation)
}
func (c *fhirStoreClientImpl) importResources(storePath, gcsURI string, contentStructure ContentStructure) (operationResults, error) {
importRequest := &healthcare.ImportResourcesRequest{
ContentStructure: contentStructure.String(),
GcsSource: &healthcare.GoogleCloudHealthcareV1FhirGcsSource{Uri: gcsURI},
}
operation, err := c.fhirStoreService().Import(storePath, importRequest).Do()
if err != nil {
return operationResults{}, err
}
return c.pollTilCompleteAndCollectResults(operation)
}
func (c *fhirStoreClientImpl) pollTilCompleteAndCollectResults(operation *healthcare.Operation) (operationResults, error) {
operation, err := c.healthcareService.Projects.Locations.Datasets.Operations.Get(operation.Name).Do()
for i := 0; err == nil && !operation.Done; {
time.Sleep(backoffDuration[i])
if i < len(backoffDuration)-1 {
i += 1
}
operation, err = c.healthcareService.Projects.Locations.Datasets.Operations.Get(operation.Name).Do()
}
if err != nil {
return operationResults{}, err
}
if operation.Error != nil {
return operationResults{}, errors.New(operation.Error.Message)
}
return parseOperationCounterResultsFrom(operation.Metadata)
}
func parseOperationCounterResultsFrom(operationMetadata []byte) (operationResults, error) {
var operationCounterField struct {
Counter struct {
operationResults
} `json:"counter"`
}
err := json.NewDecoder(bytes.NewReader(operationMetadata)).Decode(&operationCounterField)
if err != nil {
return operationResults{}, err
}
return operationCounterField.Counter.operationResults, nil
}
type fnCommonVariables struct {
client fhirStoreClient
resourcesErrorCount beam.Counter
resourcesSuccessCount beam.Counter
latencyMs beam.Distribution
}
func (fnc *fnCommonVariables) setup(namespace string) {
if fnc.client == nil {
fnc.client = newFhirStoreClient()
}
fnc.resourcesErrorCount = beam.NewCounter(namespace, errorCounterName)
fnc.resourcesSuccessCount = beam.NewCounter(namespace, successCounterName)
fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
}