blob: ceb13eb5a4082df7b10efb99793405024be84c55 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// import is a pipeline example using the fhirio connector to bulk import FHIR
// resources from GCS into a given FHIR store.
//
// Pre-requisites:
// 1. NDJSON-encoded FHIR resources stored in GCS.
// 2. Dataflow Runner enabled: https://cloud.google.com/dataflow/docs/quickstarts.
// 3. A Google Cloud FHIR store.
//
// Running this pipeline requires providing a fully qualified GCS address
// (potentially containing wildcards) to where your FHIR resources are stored, a
// path to the FHIR store where the resources should be written to, in addition
// to the usual flags for the Dataflow runner.
//
// An example command for executing this pipeline on GCP is as follows:
// export PROJECT="$(gcloud config get-value project)"
// export TEMP_LOCATION="gs://MY-BUCKET/temp"
// export STAGING_LOCATION="gs://MY-BUCKET/staging"
// export REGION="us-central1"
// export SOURCE_GCS_LOCATION="gs://MY_BUCKET/path/to/resources/**"
// export FHIR_STORE_PATH="MY_FHIR_STORE_PATH"
// cd ./sdks/go
// go run ./examples/fhirio/import/import.go \
// --runner=dataflow \
// --temp_location=$TEMP_LOCATION \
// --staging_location=$STAGING_LOCATION \
// --project=$PROJECT \
// --region=$REGION \
// --worker_harness_container_image=apache/beam_go_sdk:latest \
// --sourceGcsLocation=$SOURCE_GCS_LOCATION \
// --fhirStore=$FHIR_STORE_PATH
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fhirio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
// Required flag with the source directory for GCS files to read, including
// wildcards. Directory should contain the resources files in NDJSON format.
sourceGcsLocation = flag.String("sourceGcsLocation", "", "The source directory for GCS files to read, including wildcards.")
// Required flag with target FHIR store to write data to, must be of the full format:
// "projects/project_id/locations/location/datasets/DATASET_ID/fhirStores/FHIR_STORE_ID"
fhirStore = flag.String("fhirStore", "", "The target FHIR Store to write data to, must be of the full format.")
)
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
// Read resources from GCS.
resourcesInGcs := textio.Read(s, *sourceGcsLocation)
// Import the read resources to the provided FHIR store.
fhirio.Import(s, *fhirStore, "", "", fhirio.ContentStructureResource, resourcesInGcs)
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}