Merge pull request #12473 from [BEAM-10601] DICOM API Beam IO connector e2e test
* add integration test
* fix lint
* fix style and update changes.md
* resolved comments and fix dependency bug
* fix dependency
* fix dependency
* add documentation, use the right storage and reduce number of pipelines.
* add comment
* modify client a little
diff --git a/CHANGES.md b/CHANGES.md
index c3b7959..7e15256 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
* Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Add streaming support to SnowflakeIO in Java SDK ([BEAM-9896](https://issues.apache.org/jira/browse/BEAM-9896 ))
+* Support reading and writing to Google Healthcare DICOM APIs in Python SDK ([BEAM-10601](https://issues.apache.org/jira/browse/BEAM-10601))
## New Features / Improvements
diff --git a/sdks/python/apache_beam/io/gcp/dicomclient.py b/sdks/python/apache_beam/io/gcp/dicomclient.py
index e38a310..dbca17c 100644
--- a/sdks/python/apache_beam/io/gcp/dicomclient.py
+++ b/sdks/python/apache_beam/io/gcp/dicomclient.py
@@ -76,7 +76,6 @@
while True:
params['offset'] = offset
response = session.get(dicomweb_path, headers=headers, params=params)
- response.raise_for_status()
status = response.status_code
if status != 200:
if offset == 0:
@@ -84,7 +83,6 @@
params['offset'] = offset - 1
params['limit'] = 1
response = session.get(dicomweb_path, headers=headers, params=params)
- response.raise_for_status()
check_status = response.status_code
if check_status == 200:
# if the number of results equals to page size
@@ -123,6 +121,5 @@
headers = {"Content-Type": content_type}
response = session.post(dicomweb_path, data=dcm_file, headers=headers)
- response.raise_for_status()
return None, response.status_code
diff --git a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
new file mode 100644
index 0000000..f0faac4
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+This e2e test will first create a temporary empty DICOM storage and send 18
+DICOM files from gs://apache-beam-samples/healthcare/dicom/io_test_files to
+it. The test will compare the metadata of a persistent DICOM storage, which
+reprensets ground truths and has 18 files stored, to the temporary storage
+in order to check if the connectors are functioning correctly.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import sys
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from apache_beam.io.gcp.dicomclient import DicomApiHttpClient
+ from apache_beam.io.gcp.dicomio import DicomSearch
+ from apache_beam.io.gcp.dicomio import UploadToDicomStore
+ from google.auth import default
+ from google.auth.transport import requests
+except ImportError:
+ DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+BUCKET_NAME = 'apache-beam-samples'
+DICOM_DIR_PATH = 'healthcare/dicom'
+DICOM_FILES_PATH = 'gs://' + BUCKET_NAME + '/' + DICOM_DIR_PATH
+METADATA_DIR_PATH = DICOM_DIR_PATH + '/io_test_metadata/'
+META_DATA_ALL_NAME = 'Dicom_io_it_test_data.json'
+META_DATA_REFINED_NAME = 'Dicom_io_it_test_refined_data.json'
+NUM_INSTANCE = 18
+RAND_LEN = 15
+
+
+def random_string_generator(length):
+ letters_and_digits = string.ascii_letters + string.digits
+ result = ''.join((random.choice(letters_and_digits) for i in range(length)))
+ return result
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+ # Create a an empty DICOM store
+ credential, _ = default()
+ session = requests.AuthorizedSession(credential)
+ api_endpoint = "{}/projects/{}/locations/{}".format(
+ HEALTHCARE_BASE_URL, project_id, region)
+
+ # base of dicomweb path.
+ dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+ response = session.post(
+ dicomweb_path, params={"dicomStoreId": dicom_store_id})
+ response.raise_for_status()
+ return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+ # Delete an existing DICOM store
+ credential, _ = default()
+ session = requests.AuthorizedSession(credential)
+ api_endpoint = "{}/projects/{}/locations/{}".format(
+ HEALTHCARE_BASE_URL, project_id, region)
+
+ # base of dicomweb path.
+ dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+ api_endpoint, dataset_id, dicom_store_id)
+
+ response = session.delete(dicomweb_path)
+ response.raise_for_status()
+ return response.status_code
+
+
+def get_gcs_file_http(file_name):
+ # Get gcs file from REST Api
+ file_name = file_name.replace('/', '%2F')
+ api_endpoint = "{}/b/{}/o/{}?alt=media".format(
+ GCS_BASE_URL, BUCKET_NAME, file_name)
+
+ credential, _ = default()
+ session = requests.AuthorizedSession(credential)
+
+ response = session.get(api_endpoint)
+ response.raise_for_status()
+ return response.json()
+
+
+@unittest.skipIf(DicomSearch is None, 'GCP dependencies are not installed')
+class DICOMIoIntegrationTest(unittest.TestCase):
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.project = self.test_pipeline.get_option('project')
+ self.expected_output_all_metadata = get_gcs_file_http(
+ METADATA_DIR_PATH + META_DATA_ALL_NAME)
+ self.expected_output_refined_metadata = get_gcs_file_http(
+ METADATA_DIR_PATH + META_DATA_REFINED_NAME)
+
+ # create a temp Dicom store based on the time stamp
+ self.temp_dicom_store = "DICOM_store_" + random_string_generator(RAND_LEN)
+ create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+ def tearDown(self):
+ # clean up the temp Dicom store
+ delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+ @attr('IT')
+ def test_dicom_search_instances(self):
+ # Search and compare the metadata of a persistent DICOM store.
+ # Both refine and comprehensive search will be tested.
+ input_dict_all = {}
+ input_dict_all['project_id'] = self.project
+ input_dict_all['region'] = REGION
+ input_dict_all['dataset_id'] = DATA_SET_ID
+ input_dict_all['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+ input_dict_all['search_type'] = "instances"
+
+ input_dict_refine = {}
+ input_dict_refine['project_id'] = self.project
+ input_dict_refine['region'] = REGION
+ input_dict_refine['dataset_id'] = DATA_SET_ID
+ input_dict_refine['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+ input_dict_refine['search_type'] = "instances"
+ input_dict_refine['params'] = {
+ 'StudyInstanceUID': 'study_000000001', 'limit': 500, 'offset': 0
+ }
+
+ expected_dict_all = {}
+ expected_dict_all['result'] = self.expected_output_all_metadata
+ expected_dict_all['status'] = 200
+ expected_dict_all['input'] = input_dict_all
+ expected_dict_all['success'] = True
+
+ expected_dict_refine = {}
+ expected_dict_refine['result'] = self.expected_output_refined_metadata
+ expected_dict_refine['status'] = 200
+ expected_dict_refine['input'] = input_dict_refine
+ expected_dict_refine['success'] = True
+
+ with self.test_pipeline as p:
+ results_all = (
+ p
+ | 'create all dict' >> beam.Create([input_dict_all])
+ | 'search all' >> DicomSearch())
+ results_refine = (
+ p
+ | 'create refine dict' >> beam.Create([input_dict_refine])
+ | 'search refine' >> DicomSearch())
+
+ assert_that(
+ results_all, equal_to([expected_dict_all]), label='all search assert')
+ assert_that(
+ results_refine,
+ equal_to([expected_dict_refine]),
+ label='refine search assert')
+
+ @attr('IT')
+ def test_dicom_store_instance_from_gcs(self):
+ # Store DICOM files to a empty DICOM store from a GCS bucket,
+ # then check if the store metadata match.
+ input_dict_store = {}
+ input_dict_store['project_id'] = self.project
+ input_dict_store['region'] = REGION
+ input_dict_store['dataset_id'] = DATA_SET_ID
+ input_dict_store['dicom_store_id'] = self.temp_dicom_store
+
+ expected_output = [True] * NUM_INSTANCE
+
+ with self.test_pipeline as p:
+ gcs_path = DICOM_FILES_PATH + "/io_test_files/*"
+ results = (
+ p
+ | fileio.MatchFiles(gcs_path)
+ | fileio.ReadMatches()
+ | UploadToDicomStore(input_dict_store, 'fileio')
+ | beam.Map(lambda x: x['success']))
+ assert_that(
+ results, equal_to(expected_output), label='store first assert')
+
+ # Check the metadata using client
+ result, status_code = DicomApiHttpClient().qido_search(
+ self.project, REGION, DATA_SET_ID, self.temp_dicom_store, 'instances'
+ )
+
+ self.assertEqual(status_code, 200)
+
+ # List comparison based on different version of python
+ if sys.version_info.major == 3:
+ self.assertCountEqual(result, self.expected_output_all_metadata)
+ else:
+ self.assertItemsEqual(result, self.expected_output_all_metadata)
+
+
+if __name__ == '__main__':
+ unittest.main()