blob: 4b7757bbf95165e3e1a095b20308f4c30e7dd2c1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package fhirio provides an API for reading and writing resources to Google
// Cloud Healthcare Fhir stores.
// Experimental.
package fhirio
import (
"bytes"
"context"
"encoding/json"
"net/http"
"regexp"
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)
const (
bundleResponseTypeBatch = "batch-response"
bundleResponseTypeTransaction = "transaction-response"
)
func init() {
register.DoFn4x0[context.Context, []byte, func(string), func(string)]((*executeBundleFn)(nil))
register.Emitter1[string]()
}
type executeBundleFn struct {
fnCommonVariables
successesCount beam.Counter
// Path to FHIR store where bundle requests will be executed on.
FhirStorePath string
}
func (fn executeBundleFn) String() string {
return "executeBundleFn"
}
func (fn *executeBundleFn) Setup() {
fn.fnCommonVariables.setup(fn.String())
fn.successesCount = beam.NewCounter(fn.String(), baseMetricPrefix+"success_count")
}
func (fn *executeBundleFn) ProcessElement(ctx context.Context, inputBundleBody []byte, emitSuccess, emitFailure func(string)) {
response, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
return fn.client.executeBundle(fn.FhirStorePath, inputBundleBody)
})
if err != nil {
fn.resourcesErrorCount.Inc(ctx, 1)
emitFailure(errors.Wrap(err, "execute bundle request returned error").Error())
return
}
body, err := extractBodyFrom(response)
if err != nil {
fn.resourcesErrorCount.Inc(ctx, 1)
emitFailure(errors.Wrap(err, "could not extract body from execute bundles response").Error())
return
}
fn.processResponseBody(ctx, body, emitSuccess, emitFailure)
}
func (fn *executeBundleFn) processResponseBody(ctx context.Context, body string, emitSuccess, emitFailure func(string)) {
var bodyFields struct {
Type string `json:"type"`
Entries []interface{} `json:"entry"`
}
err := json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
if err != nil {
fn.resourcesErrorCount.Inc(ctx, 1)
emitFailure(errors.Wrap(err, "could not parse body from execute bundle response").Error())
return
}
if bodyFields.Entries == nil {
return
}
// A BATCH bundle returns a success response even if entries have failures, as
// entries are executed separately. However, TRANSACTION bundles should return
// error response (in client.executeBundle call) if any entry fails. Therefore,
// for BATCH bundles we need to parse the error and success counters.
switch bodyFields.Type {
case bundleResponseTypeTransaction:
fn.resourcesSuccessCount.Inc(ctx, int64(len(bodyFields.Entries)))
emitSuccess(body)
case bundleResponseTypeBatch:
for _, entry := range bodyFields.Entries {
var entryFields struct {
Response struct {
Status string `json:"status"`
} `json:"response"`
}
entryBytes, _ := json.Marshal(entry)
_ = json.NewDecoder(bytes.NewReader(entryBytes)).Decode(&entryFields)
if entryFields.Response.Status == "" {
continue
}
if batchResponseStatusIsBad(entryFields.Response.Status) {
fn.resourcesErrorCount.Inc(ctx, 1)
emitFailure(errors.Errorf("execute bundles entry contains bad status: [%v]", entryFields.Response.Status).Error())
} else {
fn.resourcesSuccessCount.Inc(ctx, 1)
emitSuccess(string(entryBytes))
}
}
}
fn.successesCount.Inc(ctx, 1)
}
func batchResponseStatusIsBad(status string) bool {
// 2XXs are successes, otherwise failure.
isMatch, err := regexp.MatchString("^2\\d{2}", status)
if err != nil {
return true
}
return !isMatch
}
// ExecuteBundles performs all the requests in the specified bundles on a given
// FHIR store. This transform takes a path to a FHIR store and a PCollection of
// bundles as JSON-encoded strings. It executes the requests defined on the
// bundles on the FHIR store located on the provided path. It outputs two
// PCollection<string>, the first containing the response bodies of the
// successfully performed requests and the second one error messages of the
// requests that failed to be executed.
// See: https://cloud.google.com/healthcare-api/docs/samples/healthcare-fhir-execute-bundle
func ExecuteBundles(s beam.Scope, fhirStorePath string, bundles beam.PCollection) (beam.PCollection, beam.PCollection) {
s = s.Scope("fhirio.ExecuteBundles")
return executeBundles(s, fhirStorePath, bundles, nil)
}
// This is useful as an entry point for testing because we can provide a fake FHIR store client.
func executeBundles(s beam.Scope, fhirStorePath string, bundles beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
return beam.ParDo2(s, &executeBundleFn{fnCommonVariables: fnCommonVariables{client: client}, FhirStorePath: fhirStorePath}, bundles)
}