Merge pull request #12473 from [BEAM-10601] DICOM API Beam IO connector e2e test

* add integration test

* fix lint

* fix style and update changes.md

* resolved comments and fix dependency bug

* fix dependency

* fix dependency

* add documentation, use the right storage  and reduce number of pipelines.

* add comment

* modify client a little
diff --git a/CHANGES.md b/CHANGES.md
index c3b7959..7e15256 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
 * Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
 * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Add streaming support to SnowflakeIO in Java SDK ([BEAM-9896](https://issues.apache.org/jira/browse/BEAM-9896  ))
+* Support reading and writing to Google Healthcare DICOM APIs in Python SDK ([BEAM-10601](https://issues.apache.org/jira/browse/BEAM-10601))
 
 ## New Features / Improvements
 
diff --git a/sdks/python/apache_beam/io/gcp/dicomclient.py b/sdks/python/apache_beam/io/gcp/dicomclient.py
index e38a310..dbca17c 100644
--- a/sdks/python/apache_beam/io/gcp/dicomclient.py
+++ b/sdks/python/apache_beam/io/gcp/dicomclient.py
@@ -76,7 +76,6 @@
     while True:
       params['offset'] = offset
       response = session.get(dicomweb_path, headers=headers, params=params)
-      response.raise_for_status()
       status = response.status_code
       if status != 200:
         if offset == 0:
@@ -84,7 +83,6 @@
         params['offset'] = offset - 1
         params['limit'] = 1
         response = session.get(dicomweb_path, headers=headers, params=params)
-        response.raise_for_status()
         check_status = response.status_code
         if check_status == 200:
           # if the number of results equals to page size
@@ -123,6 +121,5 @@
     headers = {"Content-Type": content_type}
 
     response = session.post(dicomweb_path, data=dcm_file, headers=headers)
-    response.raise_for_status()
 
     return None, response.status_code
diff --git a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
new file mode 100644
index 0000000..f0faac4
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+This e2e test will first create a temporary empty DICOM storage and send 18
+DICOM files from gs://apache-beam-samples/healthcare/dicom/io_test_files to
+it. The test will compare the metadata of a persistent DICOM storage, which
+reprensets ground truths and has 18 files stored, to the temporary storage
+in order to check if the connectors are functioning correctly.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import sys
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp.dicomclient import DicomApiHttpClient
+  from apache_beam.io.gcp.dicomio import DicomSearch
+  from apache_beam.io.gcp.dicomio import UploadToDicomStore
+  from google.auth import default
+  from google.auth.transport import requests
+except ImportError:
+  DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+BUCKET_NAME = 'apache-beam-samples'
+DICOM_DIR_PATH = 'healthcare/dicom'
+DICOM_FILES_PATH = 'gs://' + BUCKET_NAME + '/' + DICOM_DIR_PATH
+METADATA_DIR_PATH = DICOM_DIR_PATH + '/io_test_metadata/'
+META_DATA_ALL_NAME = 'Dicom_io_it_test_data.json'
+META_DATA_REFINED_NAME = 'Dicom_io_it_test_refined_data.json'
+NUM_INSTANCE = 18
+RAND_LEN = 15
+
+
+def random_string_generator(length):
+  letters_and_digits = string.ascii_letters + string.digits
+  result = ''.join((random.choice(letters_and_digits) for i in range(length)))
+  return result
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+def get_gcs_file_http(file_name):
+  # Get gcs file from REST Api
+  file_name = file_name.replace('/', '%2F')
+  api_endpoint = "{}/b/{}/o/{}?alt=media".format(
+      GCS_BASE_URL, BUCKET_NAME, file_name)
+
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+
+  response = session.get(api_endpoint)
+  response.raise_for_status()
+  return response.json()
+
+
+@unittest.skipIf(DicomSearch is None, 'GCP dependencies are not installed')
+class DICOMIoIntegrationTest(unittest.TestCase):
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.expected_output_all_metadata = get_gcs_file_http(
+        METADATA_DIR_PATH + META_DATA_ALL_NAME)
+    self.expected_output_refined_metadata = get_gcs_file_http(
+        METADATA_DIR_PATH + META_DATA_REFINED_NAME)
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + random_string_generator(RAND_LEN)
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search_instances(self):
+    # Search and compare the metadata of a persistent DICOM store.
+    # Both refine and comprehensive search will be tested.
+    input_dict_all = {}
+    input_dict_all['project_id'] = self.project
+    input_dict_all['region'] = REGION
+    input_dict_all['dataset_id'] = DATA_SET_ID
+    input_dict_all['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict_all['search_type'] = "instances"
+
+    input_dict_refine = {}
+    input_dict_refine['project_id'] = self.project
+    input_dict_refine['region'] = REGION
+    input_dict_refine['dataset_id'] = DATA_SET_ID
+    input_dict_refine['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict_refine['search_type'] = "instances"
+    input_dict_refine['params'] = {
+        'StudyInstanceUID': 'study_000000001', 'limit': 500, 'offset': 0
+    }
+
+    expected_dict_all = {}
+    expected_dict_all['result'] = self.expected_output_all_metadata
+    expected_dict_all['status'] = 200
+    expected_dict_all['input'] = input_dict_all
+    expected_dict_all['success'] = True
+
+    expected_dict_refine = {}
+    expected_dict_refine['result'] = self.expected_output_refined_metadata
+    expected_dict_refine['status'] = 200
+    expected_dict_refine['input'] = input_dict_refine
+    expected_dict_refine['success'] = True
+
+    with self.test_pipeline as p:
+      results_all = (
+          p
+          | 'create all dict' >> beam.Create([input_dict_all])
+          | 'search all' >> DicomSearch())
+      results_refine = (
+          p
+          | 'create refine dict' >> beam.Create([input_dict_refine])
+          | 'search refine' >> DicomSearch())
+
+      assert_that(
+          results_all, equal_to([expected_dict_all]), label='all search assert')
+      assert_that(
+          results_refine,
+          equal_to([expected_dict_refine]),
+          label='refine search assert')
+
+  @attr('IT')
+  def test_dicom_store_instance_from_gcs(self):
+    # Store DICOM files to a empty DICOM store from a GCS bucket,
+    # then check if the store metadata match.
+    input_dict_store = {}
+    input_dict_store['project_id'] = self.project
+    input_dict_store['region'] = REGION
+    input_dict_store['dataset_id'] = DATA_SET_ID
+    input_dict_store['dicom_store_id'] = self.temp_dicom_store
+
+    expected_output = [True] * NUM_INSTANCE
+
+    with self.test_pipeline as p:
+      gcs_path = DICOM_FILES_PATH + "/io_test_files/*"
+      results = (
+          p
+          | fileio.MatchFiles(gcs_path)
+          | fileio.ReadMatches()
+          | UploadToDicomStore(input_dict_store, 'fileio')
+          | beam.Map(lambda x: x['success']))
+      assert_that(
+          results, equal_to(expected_output), label='store first assert')
+
+    # Check the metadata using client
+    result, status_code = DicomApiHttpClient().qido_search(
+      self.project, REGION, DATA_SET_ID, self.temp_dicom_store, 'instances'
+    )
+
+    self.assertEqual(status_code, 200)
+
+    # List comparison based on different version of python
+    if sys.version_info.major == 3:
+      self.assertCountEqual(result, self.expected_output_all_metadata)
+    else:
+      self.assertItemsEqual(result, self.expected_output_all_metadata)
+
+
+if __name__ == '__main__':
+  unittest.main()