blob: f0faac4812301a2b4414e1165c2f01b12636e68d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Integration test for Google Cloud DICOM IO connector.
This e2e test will first create a temporary empty DICOM storage and send 18
DICOM files from gs://apache-beam-samples/healthcare/dicom/io_test_files to
it. The test will compare the metadata of a persistent DICOM storage, which
reprensets ground truths and has 18 files stored, to the temporary storage
in order to check if the connectors are functioning correctly.
"""
# pytype: skip-file
from __future__ import absolute_import
import random
import string
import sys
import unittest
from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apache_beam.io.gcp.dicomclient import DicomApiHttpClient
from apache_beam.io.gcp.dicomio import DicomSearch
from apache_beam.io.gcp.dicomio import UploadToDicomStore
from google.auth import default
from google.auth.transport import requests
except ImportError:
DicomSearch = None
# pylint: enable=wrong-import-order, wrong-import-position
REGION = 'us-central1'
DATA_SET_ID = 'apache-beam-integration-testing'
HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
BUCKET_NAME = 'apache-beam-samples'
DICOM_DIR_PATH = 'healthcare/dicom'
DICOM_FILES_PATH = 'gs://' + BUCKET_NAME + '/' + DICOM_DIR_PATH
METADATA_DIR_PATH = DICOM_DIR_PATH + '/io_test_metadata/'
META_DATA_ALL_NAME = 'Dicom_io_it_test_data.json'
META_DATA_REFINED_NAME = 'Dicom_io_it_test_refined_data.json'
NUM_INSTANCE = 18
RAND_LEN = 15
def random_string_generator(length):
letters_and_digits = string.ascii_letters + string.digits
result = ''.join((random.choice(letters_and_digits) for i in range(length)))
return result
def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
# Create a an empty DICOM store
credential, _ = default()
session = requests.AuthorizedSession(credential)
api_endpoint = "{}/projects/{}/locations/{}".format(
HEALTHCARE_BASE_URL, project_id, region)
# base of dicomweb path.
dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
response = session.post(
dicomweb_path, params={"dicomStoreId": dicom_store_id})
response.raise_for_status()
return response.status_code
def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
# Delete an existing DICOM store
credential, _ = default()
session = requests.AuthorizedSession(credential)
api_endpoint = "{}/projects/{}/locations/{}".format(
HEALTHCARE_BASE_URL, project_id, region)
# base of dicomweb path.
dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
api_endpoint, dataset_id, dicom_store_id)
response = session.delete(dicomweb_path)
response.raise_for_status()
return response.status_code
def get_gcs_file_http(file_name):
# Get gcs file from REST Api
file_name = file_name.replace('/', '%2F')
api_endpoint = "{}/b/{}/o/{}?alt=media".format(
GCS_BASE_URL, BUCKET_NAME, file_name)
credential, _ = default()
session = requests.AuthorizedSession(credential)
response = session.get(api_endpoint)
response.raise_for_status()
return response.json()
@unittest.skipIf(DicomSearch is None, 'GCP dependencies are not installed')
class DICOMIoIntegrationTest(unittest.TestCase):
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.project = self.test_pipeline.get_option('project')
self.expected_output_all_metadata = get_gcs_file_http(
METADATA_DIR_PATH + META_DATA_ALL_NAME)
self.expected_output_refined_metadata = get_gcs_file_http(
METADATA_DIR_PATH + META_DATA_REFINED_NAME)
# create a temp Dicom store based on the time stamp
self.temp_dicom_store = "DICOM_store_" + random_string_generator(RAND_LEN)
create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
def tearDown(self):
# clean up the temp Dicom store
delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
@attr('IT')
def test_dicom_search_instances(self):
# Search and compare the metadata of a persistent DICOM store.
# Both refine and comprehensive search will be tested.
input_dict_all = {}
input_dict_all['project_id'] = self.project
input_dict_all['region'] = REGION
input_dict_all['dataset_id'] = DATA_SET_ID
input_dict_all['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
input_dict_all['search_type'] = "instances"
input_dict_refine = {}
input_dict_refine['project_id'] = self.project
input_dict_refine['region'] = REGION
input_dict_refine['dataset_id'] = DATA_SET_ID
input_dict_refine['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
input_dict_refine['search_type'] = "instances"
input_dict_refine['params'] = {
'StudyInstanceUID': 'study_000000001', 'limit': 500, 'offset': 0
}
expected_dict_all = {}
expected_dict_all['result'] = self.expected_output_all_metadata
expected_dict_all['status'] = 200
expected_dict_all['input'] = input_dict_all
expected_dict_all['success'] = True
expected_dict_refine = {}
expected_dict_refine['result'] = self.expected_output_refined_metadata
expected_dict_refine['status'] = 200
expected_dict_refine['input'] = input_dict_refine
expected_dict_refine['success'] = True
with self.test_pipeline as p:
results_all = (
p
| 'create all dict' >> beam.Create([input_dict_all])
| 'search all' >> DicomSearch())
results_refine = (
p
| 'create refine dict' >> beam.Create([input_dict_refine])
| 'search refine' >> DicomSearch())
assert_that(
results_all, equal_to([expected_dict_all]), label='all search assert')
assert_that(
results_refine,
equal_to([expected_dict_refine]),
label='refine search assert')
@attr('IT')
def test_dicom_store_instance_from_gcs(self):
# Store DICOM files to a empty DICOM store from a GCS bucket,
# then check if the store metadata match.
input_dict_store = {}
input_dict_store['project_id'] = self.project
input_dict_store['region'] = REGION
input_dict_store['dataset_id'] = DATA_SET_ID
input_dict_store['dicom_store_id'] = self.temp_dicom_store
expected_output = [True] * NUM_INSTANCE
with self.test_pipeline as p:
gcs_path = DICOM_FILES_PATH + "/io_test_files/*"
results = (
p
| fileio.MatchFiles(gcs_path)
| fileio.ReadMatches()
| UploadToDicomStore(input_dict_store, 'fileio')
| beam.Map(lambda x: x['success']))
assert_that(
results, equal_to(expected_output), label='store first assert')
# Check the metadata using client
result, status_code = DicomApiHttpClient().qido_search(
self.project, REGION, DATA_SET_ID, self.temp_dicom_store, 'instances'
)
self.assertEqual(status_code, 200)
# List comparison based on different version of python
if sys.version_info.major == 3:
self.assertCountEqual(result, self.expected_output_all_metadata)
else:
self.assertItemsEqual(result, self.expected_output_all_metadata)
if __name__ == '__main__':
unittest.main()