blob: fcf1108ac5ab6eab98876ef553110101eaa745d0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// read_write_pubsub is a pipeline example using the fhirio connector to read
// FHIR resources from GCS, write them to a GCP FHIR store, and, if a PubSub
// topic is provided, read the written resources from the FHIR store and log them
// based on the PubSub notifications about store updates.
//
// Pre-requisites:
// 1. NDJSON-encoded FHIR resources stored in GCS.
// 2. Dataflow Runner enabled: https://cloud.google.com/dataflow/docs/quickstarts.
// 3. A Google Cloud FHIR store. Optionally, PubSub notifications set up on the store.
// (see: https://cloud.google.com/healthcare-api/docs/concepts/pubsub).
//
// Running this pipeline requires providing a fully qualified GCS address
// (potentially containing wildcards) to where your FHIR resources are stored, a
// path to the FHIR store where the resources should be written to, and,
// optionally, the PubSub topic name your FHIR store is sending notifications to,
// in addition to the usual flags for the Dataflow runner.
//
// An example command for executing this pipeline on GCP is as follows:
// export PROJECT="$(gcloud config get-value project)"
// export TEMP_LOCATION="gs://MY-BUCKET/temp"
// export STAGING_LOCATION="gs://MY-BUCKET/staging"
// export REGION="us-central1"
// export SOURCE_GCS_LOCATION="gs://MY_BUCKET/path/to/resources/**"
// export FHIR_STORE_PATH="MY_FHIR_STORE_PATH"
// export PUBSUB_TOPIC="MY_FHIR_STORE_TOPIC"
// cd ./sdks/go
// go run ./examples/fhirio/read_write_pubsub/read_write_pubsub.go \
// --runner=dataflow \
// --temp_location=$TEMP_LOCATION \
// --staging_location=$STAGING_LOCATION \
// --project=$PROJECT \
// --region=$REGION \
// --worker_harness_container_image=apache/beam_go_sdk:latest \
// --sourceGcsLocation=$SOURCE_GCS_LOCATION \
// --fhirStore=$FHIR_STORE_PATH \
// --pubsubTopic=$PUBSUB_TOPIC
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"strings"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fhirio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
// Required flag with the source directory for GCS files to read, including
// wildcards. Directory should contain the resources files in NDJSON format.
sourceGcsLocation = flag.String("sourceGcsLocation", "", "The source directory for GCS files to read, including wildcards.")
// Required flag with target FHIR store to write data to, must be of the full format:
// "projects/project_id/locations/location/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID"
fhirStore = flag.String("fhirStore", "", "The target FHIR Store to write data to, must be of the full format.")
// Optional flag with the pubsub topic of your FHIR store to read and log upon store updates.
pubsubTopic = flag.String("pubsubTopic", "", "The PubSub topic to listen to.")
)
func init() {
register.Function1x1[string, string](WrapInBundle)
register.DoFn2x0[context.Context, string](&LoggerFn{})
}
// WrapInBundle takes a FHIR resource string and wraps it as a Bundle resource.
// Useful so we can publish the given resource through ExecuteBundles.
func WrapInBundle(resource string) string {
var r struct {
ResourceType string `json:"resourceType"`
}
json.NewDecoder(strings.NewReader(resource)).Decode(&r)
return fmt.Sprintf(`{
"resourceType": "Bundle",
"type": "batch",
"entry": [
{
"request": {
"method": "POST",
"url": "%s"
},
"resource": %s
}
]
}`, r.ResourceType, resource)
}
// LoggerFn is a helper DoFn to log elements received.
type LoggerFn struct {
LogPrefix string
}
// ProcessElement logs each element it receives.
func (fn *LoggerFn) ProcessElement(ctx context.Context, elm string) {
log.Infof(ctx, "%s: %v", fn.LogPrefix, elm)
}
// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LoggerFn) FinishBundle() {
time.Sleep(2 * time.Second)
}
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
// Read resources from GCS.
resourcesInGcs := textio.Read(s, *sourceGcsLocation)
resourceBundles := beam.ParDo(s, WrapInBundle, resourcesInGcs)
// Write resources to store.
_, failedWritesErrorMessage := fhirio.ExecuteBundles(s, *fhirStore, resourceBundles)
beam.ParDo0(s, &LoggerFn{"Failed Write"}, failedWritesErrorMessage)
if *pubsubTopic != "" {
// PubSub notifications will be emitted containing the path of the resource once
// it is written to the store. Simultaneously read notifications and resources
// from PubSub and store, respectively.
resourceNotifications := pubsubio.Read(s, *gcpopts.Project, *pubsubTopic, nil)
resourcesInFhirStore, deadLetters := fhirio.Read(s, resourceNotifications)
// Log the read resources or read errors to the server.
beam.ParDo0(s, &LoggerFn{"Read Resource"}, resourcesInFhirStore)
beam.ParDo0(s, &LoggerFn{"Got Dead Letter"}, deadLetters)
}
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}