blob: 0d59be4913e2972d0fa17c0f24eda557ce7b1585 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package fhirio provides an API for reading and writing resources to Google
// Cloud Healthcare Fhir stores.
// Experimental.
package fhirio
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/google/uuid"
)
func init() {
register.DoFn3x0[context.Context, string, func(string)]((*importFn)(nil))
register.DoFn4x0[context.Context, string, func(string), func(string)]((*createBatchFilesFn)(nil))
register.Emitter1[string]()
}
// ContentStructure representation as per:
// https://cloud.google.com/healthcare-api/docs/reference/rest/v1/projects.locations.datasets.fhirStores/import#contentstructure
type ContentStructure int
const (
// ContentStructureUnspecified is to be used as argument to Import if the content
// structure is not specified, the default value BUNDLE is used.
ContentStructureUnspecified ContentStructure = iota
// ContentStructureBundle is to be used as argument to Import if the source file
// contains one or more lines of newline-delimited JSON (ndjson). Each line is a
// bundle that contains one or more resources.
ContentStructureBundle
// ContentStructureResource is to be used as argument to Import if the source
// file contains one or more lines of newline-delimited JSON (ndjson). Each line
// is a single resource.
ContentStructureResource
)
func (cs ContentStructure) String() string {
switch cs {
case ContentStructureBundle:
return "BUNDLE"
case ContentStructureResource:
return "RESOURCE"
case ContentStructureUnspecified:
fallthrough
default:
return "CONTENT_STRUCTURE_UNSPECIFIED"
}
}
type createBatchFilesFn struct {
fs filesystem.Interface
batchFileWriter io.WriteCloser
batchFilePath string
TempLocation string
}
func (fn *createBatchFilesFn) StartBundle(ctx context.Context, _, _ func(string)) error {
fs, err := filesystem.New(ctx, fn.TempLocation)
if err != nil {
return err
}
fn.fs = fs
fn.batchFilePath = fmt.Sprintf("%s/fhirImportBatch-%v.ndjson", fn.TempLocation, uuid.New())
log.Infof(ctx, "Opening to write batch file: %v", fn.batchFilePath)
fn.batchFileWriter, err = fn.fs.OpenWrite(ctx, fn.batchFilePath)
if err != nil {
return err
}
return nil
}
func (fn *createBatchFilesFn) ProcessElement(ctx context.Context, resource string, _, emitFailedResource func(string)) {
_, err := fn.batchFileWriter.Write([]byte(resource + "\n"))
if err != nil {
log.Warnf(ctx, "Failed to write resource to batch file. Reason: %v", err)
emitFailedResource(resource)
}
}
func (fn *createBatchFilesFn) FinishBundle(ctx context.Context, emitBatchFilePath, _ func(string)) {
fn.batchFileWriter.Close()
fn.batchFileWriter = nil
fn.fs.Close()
fn.fs = nil
emitBatchFilePath(fn.batchFilePath)
log.Infof(ctx, "Batch file created: %v", fn.batchFilePath)
}
type importFn struct {
fnCommonVariables
operationCounters
fs filesystem.Interface
batchFilesPath []string
tempBatchDir string
FhirStorePath string
TempLocation, DeadLetterLocation string
ContentStructure ContentStructure
}
func (fn importFn) String() string {
return "importFn"
}
func (fn *importFn) Setup() {
fn.fnCommonVariables.setup(fn.String())
fn.operationCounters.setup(fn.String())
}
func (fn *importFn) StartBundle(ctx context.Context, _ func(string)) error {
fs, err := filesystem.New(ctx, fn.TempLocation)
if err != nil {
return err
}
fn.fs = fs
fn.batchFilesPath = make([]string, 0)
fn.tempBatchDir = fmt.Sprintf("%s/tmp-%v", fn.TempLocation, uuid.New())
return nil
}
func (fn *importFn) ProcessElement(ctx context.Context, batchFilePath string, _ func(string)) {
updatedBatchFilePath := fmt.Sprintf("%s/%s", fn.tempBatchDir, filepath.Base(batchFilePath))
err := filesystem.Rename(ctx, fn.fs, batchFilePath, updatedBatchFilePath)
if err != nil {
updatedBatchFilePath = batchFilePath
log.Warnf(ctx, "Failed to move %v to temp location. Reason: %v", batchFilePath, err)
}
fn.batchFilesPath = append(fn.batchFilesPath, updatedBatchFilePath)
}
func (fn *importFn) FinishBundle(ctx context.Context, emitDeadLetter func(string)) {
defer func() {
fn.fs.Close()
fn.fs = nil
fn.batchFilesPath = nil
}()
importURI := fn.tempBatchDir + "/*.ndjson"
log.Infof(ctx, "About to begin import operation with importURI: %v", importURI)
result, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() (operationResults, error) {
return fn.client.importResources(fn.FhirStorePath, importURI, fn.ContentStructure)
})
if err != nil {
fn.moveToDeadLetterOrRemoveFailedImportBatchFiles(ctx)
fn.operationCounters.errorCount.Inc(ctx, 1)
deadLetterMessage := fmt.Sprintf("Failed to import [%v]. Reason: %v", importURI, err)
log.Warn(ctx, deadLetterMessage)
emitDeadLetter(deadLetterMessage)
return
}
log.Infof(ctx, "Imported %v. Results: %v", importURI, result)
fn.operationCounters.successCount.Inc(ctx, 1)
fn.resourcesSuccessCount.Inc(ctx, result.Successes)
fn.resourcesErrorCount.Inc(ctx, result.Failures)
fn.removeTempBatchFiles(ctx)
}
func (fn *importFn) moveToDeadLetterOrRemoveFailedImportBatchFiles(ctx context.Context) {
if fn.DeadLetterLocation == "" {
log.Info(ctx, "Deadletter path not provided. Remove failed import batch files instead.")
fn.removeTempBatchFiles(ctx)
return
}
log.Infof(ctx, "Moving failed import files to Deadletter path: [%v]", fn.DeadLetterLocation)
for _, p := range fn.batchFilesPath {
err := filesystem.Rename(ctx, fn.fs, p, fmt.Sprintf("%s/%s", fn.DeadLetterLocation, filepath.Base(p)))
if err != nil {
log.Warnf(ctx, "Failed to move failed imported file %v to %v. Reason: %v", p, fn.DeadLetterLocation, err)
}
}
}
func (fn *importFn) removeTempBatchFiles(ctx context.Context) {
for _, p := range fn.batchFilesPath {
err := fn.fs.(filesystem.Remover).Remove(ctx, p)
if err != nil {
log.Warnf(ctx, "Failed to delete temp batch file [%v]. Reason: %v", p, err)
}
}
}
// Import consumes FHIR resources as input PCollection<string> and imports them
// into a given Google Cloud Healthcare FHIR store. It does so by creating batch
// files in the provided Google Cloud Storage `tempDir` and importing those files
// to the store through FHIR import API method: https://cloud.google.com/healthcare-api/docs/concepts/fhir-import.
// If `tempDir` is not provided, it falls back to the dataflow temp_location flag.
// Resources that fail to be included in the batch files are included as the
// first output PCollection. In case a batch file fails to be imported, it will
// be moved to the `deadLetterDir` and an error message will be provided in the
// second output PCollection. If `deadLetterDir` is not provided, the failed
// import files will be deleted and be irretrievable, but the error message will
// still be provided.
func Import(s beam.Scope, fhirStorePath, tempDir, deadLetterDir string, contentStructure ContentStructure, resources beam.PCollection) (beam.PCollection, beam.PCollection) {
s = s.Scope("fhirio.Import")
if tempDir == "" {
tempDir = tryFallbackToDataflowTempDirOrPanic()
}
tempDir = strings.TrimSuffix(tempDir, "/")
deadLetterDir = strings.TrimSuffix(deadLetterDir, "/")
return importResourcesInBatches(s, fhirStorePath, tempDir, deadLetterDir, contentStructure, resources, nil)
}
// This is useful as an entry point for testing because we can provide a fake FHIR store client.
func importResourcesInBatches(s beam.Scope, fhirStorePath, tempDir, deadLetterDir string, contentStructure ContentStructure, resources beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
batchFiles, failedResources := beam.ParDo2(s, &createBatchFilesFn{TempLocation: tempDir}, resources)
failedImportsDeadLetter := beam.ParDo(
s,
&importFn{
fnCommonVariables: fnCommonVariables{client: client},
FhirStorePath: fhirStorePath,
TempLocation: tempDir,
DeadLetterLocation: deadLetterDir,
ContentStructure: contentStructure,
},
batchFiles,
)
return failedResources, failedImportsDeadLetter
}
func tryFallbackToDataflowTempDirOrPanic() string {
beam.PipelineOptions.LoadOptionsFromFlags(nil)
if f := beam.PipelineOptions.Get("temp_location"); f != "" {
return f
}
// temp_location is optional, so fallback to staging_location.
if f := beam.PipelineOptions.Get("staging_location"); f != "" {
return f
}
panic("could not resolve to a temp directory for import batch files")
}