blob: 20fdc338c1ec533cd1b145adc677865a9ab90004 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fhirio
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"errors"
"flag"
"fmt"
"math/big"
"net/http"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fhirio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
"google.golang.org/api/healthcare/v1"
"google.golang.org/api/option"
)
const (
datasetPathFmt = "projects/%s/locations/%s/datasets/apache-beam-integration-testing"
testDataDir = "../../../../data/fhir_bundles/"
)
var (
storeService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
storeManagementService *healthcare.ProjectsLocationsDatasetsFhirStoresService
)
type fhirStoreInfo struct {
path string
resourcesPaths []string
}
func checkFlags(t *testing.T) {
gcpProjectIsNotSet := gcpopts.Project == nil || *gcpopts.Project == ""
if gcpProjectIsNotSet {
t.Skip("GCP project flag is not set.")
}
gcpRegionIsNotSet := gcpopts.Region == nil || *gcpopts.Region == ""
if gcpRegionIsNotSet {
t.Skip("GCP region flag is not set.")
}
}
func setupFhirStoreWithData(t *testing.T) (fhirStoreInfo, func()) {
return setupFhirStore(t, true)
}
func setupEmptyFhirStore(t *testing.T) (string, func()) {
storeInfo, teardown := setupFhirStore(t, false)
return storeInfo.path, teardown
}
// Sets up a test fhir store by creating and populating data to it for testing
// purposes. It returns the name of the created store path, a slice of the
// resource paths to be used in tests, and a function to teardown what has been
// set up.
func setupFhirStore(t *testing.T, shouldPopulateStore bool) (fhirStoreInfo, func()) {
t.Helper()
if storeService == nil || storeManagementService == nil {
t.Fatal("Healthcare Services were not initialized")
}
healthcareDataset := fmt.Sprintf(datasetPathFmt, *gcpopts.Project, *gcpopts.Region)
createdFhirStore, err := createStore(healthcareDataset)
if err != nil {
t.Fatalf("Test store failed to be created. Reason: %v", err.Error())
}
createdFhirStorePath := createdFhirStore.Name
var resourcePaths []string
if shouldPopulateStore {
resourcePaths = populateStore(createdFhirStorePath)
if len(resourcePaths) == 0 {
t.Fatal("No data got populated to test")
}
}
return fhirStoreInfo{
path: createdFhirStorePath,
resourcesPaths: resourcePaths,
}, func() {
_, _ = deleteStore(createdFhirStorePath)
}
}
func createStore(dataset string) (*healthcare.FhirStore, error) {
randInt, _ := rand.Int(rand.Reader, big.NewInt(32))
testFhirStoreID := "FHIR_store_write_it_" + strconv.FormatInt(time.Now().UnixMilli(), 10) + "_" + randInt.String()
fhirStore := &healthcare.FhirStore{
DisableReferentialIntegrity: true,
EnableUpdateCreate: true,
Version: "R4",
}
return storeManagementService.Create(dataset, fhirStore).FhirStoreId(testFhirStoreID).Do()
}
func deleteStore(storePath string) (*healthcare.Empty, error) {
return storeManagementService.Delete(storePath).Do()
}
// Populates fhir store with data. Note that failure to populate some data is not
// detrimental to the tests, so it is fine to ignore.
func populateStore(storePath string) []string {
resourcePaths := make([]string, 0)
for _, bundle := range readPrettyBundles() {
response, err := storeService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
if err != nil {
continue
}
var body struct {
Entry []struct {
Response struct {
Location string `json:"location"`
Status string `json:"status"`
} `json:"response"`
} `json:"entry"`
}
err = json.NewDecoder(response.Body).Decode(&body)
if err != nil {
continue
}
for _, entry := range body.Entry {
bundleFailedToBeCreated := !strings.Contains(entry.Response.Status, "201")
if bundleFailedToBeCreated {
continue
}
resourcePath, err := extractResourcePathFrom(entry.Response.Location)
if err != nil {
continue
}
resourcePaths = append(resourcePaths, resourcePath)
}
}
return resourcePaths
}
func readPrettyBundles() [][]byte {
files, _ := os.ReadDir(testDataDir)
bundles := make([][]byte, len(files))
for i, file := range files {
bundles[i], _ = os.ReadFile(testDataDir + file.Name())
}
return bundles
}
func extractResourcePathFrom(resourceLocationURL string) (string, error) {
// The resource location url is in the following format:
// https://healthcare.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/STORE_ID/fhir/RESOURCE_NAME/RESOURCE_ID/_history/HISTORY_ID
// But the API calls use this format: projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/fhirStores/STORE_ID/fhir/RESOURCE_NAME/RESOURCE_ID
startIdx := strings.Index(resourceLocationURL, "projects/")
endIdx := strings.Index(resourceLocationURL, "/_history")
if startIdx == -1 || endIdx == -1 {
return "", errors.New("resource location url is invalid")
}
return resourceLocationURL[startIdx:endIdx], nil
}
func readTestTask(t *testing.T, s beam.Scope, testStoreInfo fhirStoreInfo) func() {
t.Helper()
s = s.Scope("fhirio_test.readTestTask")
testResources := append(testStoreInfo.resourcesPaths, testStoreInfo.path+"/fhir/Patient/invalid")
resourcePathsPCollection := beam.CreateList(s, testResources)
resources, failedReads := fhirio.Read(s, resourcePathsPCollection)
passert.Count(s, resources, "", len(testStoreInfo.resourcesPaths))
passert.Count(s, failedReads, "", 1)
return nil
}
func executeBundlesTestTask(t *testing.T, s beam.Scope, testStoreInfo fhirStoreInfo) func() {
t.Helper()
s = s.Scope("fhirio_test.executeBundlesTestTask")
fhirStorePath, teardownFhirStore := setupEmptyFhirStore(t)
bundlesPCollection := beam.CreateList(s, readPrettyBundles())
successBodies, failures := fhirio.ExecuteBundles(s, fhirStorePath, bundlesPCollection)
passert.Count(s, successBodies, "", 2)
passert.Count(s, failures, "", 2)
passert.True(s, failures, func(errorMsg string) bool {
return strings.Contains(errorMsg, strconv.Itoa(http.StatusBadRequest))
})
return teardownFhirStore
}
func searchTestTask(t *testing.T, s beam.Scope, testStoreInfo fhirStoreInfo) func() {
t.Helper()
s = s.Scope("fhirio_test.searchTestTask")
searchQueries := []fhirio.SearchQuery{
{},
{ResourceType: "Patient"},
{ResourceType: "Patient", Parameters: map[string]string{"gender": "female", "family:contains": "Smith"}},
{ResourceType: "Encounter"},
}
searchQueriesCol := beam.CreateList(s, searchQueries)
searchResult, deadLetter := fhirio.Search(s, testStoreInfo.path, searchQueriesCol)
passert.Empty(s, deadLetter)
passert.Count(s, searchResult, "", len(searchQueries))
resourcesFoundCount := beam.ParDo(s, func(identifier string, resourcesFound []string) int {
return len(resourcesFound)
}, searchResult)
passert.Equals(s, resourcesFoundCount, 4, 2, 1, 0)
return nil
}
func deidentifyTestTask(t *testing.T, s beam.Scope, testStoreInfo fhirStoreInfo) func() {
t.Helper()
s = s.Scope("fhirio_test.deidentifyTestTask")
dstFhirStorePath, teardownDstFhirStore := setupEmptyFhirStore(t)
res := fhirio.Deidentify(s, testStoreInfo.path, dstFhirStorePath, &healthcare.DeidentifyConfig{})
passert.Count(s, res, "", 1)
return teardownDstFhirStore
}
func TestFhirIO(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)
testStoreInfo, teardownFhirStore := setupFhirStoreWithData(t)
defer teardownFhirStore()
p, s := beam.NewPipelineWithRoot()
type testTask func(*testing.T, beam.Scope, fhirStoreInfo) func()
testTasks := []testTask{
readTestTask,
executeBundlesTestTask,
searchTestTask,
deidentifyTestTask,
}
teardownTasks := make([]func(), len(testTasks))
for i, testTaskCallable := range testTasks {
teardownTasks[i] = testTaskCallable(t, s, testStoreInfo)
}
defer func() {
for _, teardown := range teardownTasks {
if teardown != nil {
teardown()
}
}
}()
ptest.RunAndValidate(t, p)
}
func TestMain(m *testing.M) {
flag.Parse()
beam.Init()
healthcareService, err := healthcare.NewService(context.Background(), option.WithUserAgent(fhirio.UserAgent))
if err == nil {
storeService = healthcare.NewProjectsLocationsDatasetsFhirStoresFhirService(healthcareService)
storeManagementService = healthcare.NewProjectsLocationsDatasetsFhirStoresService(healthcareService)
}
ptest.MainRet(m)
}